In [None]:
! pip install pyspark
! pip install avro

In [1]:
import os
from pathlib import Path

In [3]:
from pyspark.sql import SparkSession
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, BooleanType, TimestampType, DoubleType, IntegerType

In [4]:
spark = SparkSession.builder.appName("DataPipeline").getOrCreate()

PySparkRuntimeError: [JAVA_GATEWAY_EXITED] Java gateway process exited before sending its port number.

In [16]:
BASE_DIR = Path().parent.absolute()
BASE_DIR

PosixPath('/content')

In [17]:
customer_files = []
delivery_files = []
orders_files = []

In [18]:
# load paths to dataset
for path in os.listdir(BASE_DIR):
    if path.endswith('csv') | path.endswith('json'):
        if 'customers' in path.lower():
            customer_files.append(path)
        elif 'orders' in path.lower():
            orders_files.append(path)
        elif 'deliveries' in path.lower():
            delivery_files.append(path)


print(f"Customers files: {len(customer_files)}")
print(f"Delivery files: {len(delivery_files)}")
print(f"Orders files: {len(orders_files)}")

Customers files: 1
Delivery files: 1
Orders files: 1


In [8]:
# Define schema for JSON data
customers_schema = StructType([
    StructField("Customer ID", StringType(), nullable=False),
    StructField("Last Used Platform", StringType(), nullable=False),
    StructField("Is Blocked", BooleanType(), nullable=False),
    StructField("Created At", StringType(), nullable=False),
    StructField("Language", StringType(), nullable=False),
    StructField("Outstanding Amount", DoubleType(), nullable=False),
    StructField("Loyalty Points", IntegerType(), nullable=False),
    StructField("Number of employees", IntegerType(), nullable=True),
])



orders_schema = StructType([
    StructField("Order ID", StringType(), nullable=True),
    StructField("Order Status", StringType(), nullable=True),
    StructField("Category Name", StringType(), nullable=True),
    StructField("SKU", StringType(), nullable=True),
    StructField("Customization Group", StringType(), nullable=True),
    StructField("Customization Option", StringType(), nullable=True),
    StructField("Quantity", IntegerType(), nullable=True),
    StructField("Unit Price", DoubleType(), nullable=True),
    StructField("Cost Price", DoubleType(), nullable=True),
    StructField("Total Cost Price", DoubleType(), nullable=True),
    StructField("Total Price", DoubleType(), nullable=True),
    StructField("Order Total", DoubleType(), nullable=True),
    StructField("Sub Total", DoubleType(), nullable=True),
    StructField("Tax", DoubleType(), nullable=True),
    StructField("Delivery Charge", DoubleType(), nullable=True),
    StructField("Tip", DoubleType(), nullable=True),
    StructField("Discount", DoubleType(), nullable=True),
    StructField("Remaining Balance", DoubleType(), nullable=True),
    StructField("Payment Method", StringType(), nullable=True),
    StructField("Additional Charge", DoubleType(), nullable=True),
    StructField("Taxable Amount", DoubleType(), nullable=True),
    StructField("Transaction ID", StringType(), nullable=True),
    StructField("Currency Symbol", StringType(), nullable=True),
    StructField("Transaction Status", StringType(), nullable=True),
    StructField("Promo Code", StringType(), nullable=True),
    StructField("Customer ID", StringType(), nullable=True),
    StructField("Merchant ID", StringType(), nullable=True),
    StructField("Description", StringType(), nullable=True),
    StructField("Distance (in km)", DoubleType(), nullable=True),
    StructField("Order Time", TimestampType(), nullable=True),
    StructField("Pickup Time", TimestampType(), nullable=True),
    StructField("Delivery Time", TimestampType(), nullable=True),
    StructField("Ratings", IntegerType(), nullable=True),
    StructField("Reviews", StringType(), nullable=True),
    StructField("Merchant Earning", DoubleType(), nullable=True),
    StructField("Commission Amount", DoubleType(), nullable=True),
    StructField("Commission Payout Status", StringType(), nullable=True),
    StructField("Order Preparation Time", IntegerType(), nullable=True),
    StructField("Debt Amount", DoubleType(), nullable=True),
    StructField("Redeemed Loyalty Points", IntegerType(), nullable=True),
    StructField("Consumed Loyalty Points", IntegerType(), nullable=True),
    StructField("Cancellation Reason", StringType(), nullable=True),
    StructField("Flat Discount", DoubleType(), nullable=True),
    StructField("Checkout Template Name", StringType(), nullable=True),
    StructField("Checkout Template Value", StringType(), nullable=True)
])


deliveries_schema = StructType([
    StructField("Task_ID", StringType(), nullable=True),
    StructField("Order_ID", StringType(), nullable=True),
    StructField("Relationship", StringType(), nullable=True),
    StructField("Team_Name", StringType(), nullable=True),
    StructField("Task_Type", StringType(), nullable=True),
    StructField("Notes", StringType(), nullable=True),
    StructField("Agent_ID", StringType(), nullable=True),
    StructField("Agent_Name", StringType(), nullable=True),
    StructField("Distance(m)", DoubleType(), nullable=True),
    StructField("Total_Time_Taken(min)", DoubleType(), nullable=True),
    StructField("Task_Status", StringType(), nullable=True),
    StructField("Ref_Images", StringType(), nullable=True),
    StructField("Rating", IntegerType(), nullable=True),
    StructField("Review", StringType(), nullable=True),
    StructField("Latitude", DoubleType(), nullable=True),
    StructField("Longitude", DoubleType(), nullable=True),
    StructField("Tags", StringType(), nullable=True),
    StructField("Promo_Applied", StringType(), nullable=True),
    StructField("Custom_Template_ID", StringType(), nullable=True),
    StructField("Task_Details_QTY", IntegerType(), nullable=True),
    StructField("Task_Details_AMOUNT", DoubleType(), nullable=True),
    StructField("Special_Instructions", StringType(), nullable=True),
    StructField("Tip", DoubleType(), nullable=True),
    StructField("Delivery_Charges", DoubleType(), nullable=True),
    StructField("Discount", DoubleType(), nullable=True),
    StructField("Subtotal", DoubleType(), nullable=True),
    StructField("Payment_Type", StringType(), nullable=True),
    StructField("Task_Category", StringType(), nullable=True),
    StructField("Earning", DoubleType(), nullable=True),
    StructField("Pricing", StringType(), nullable=True)
])

In [27]:
def json_file_loader(path, schema, dynamic_schema):
    try:
        file_path = f'{BASE_DIR}/{path}'
        if dynamic_schema:
                  # Load JSON file into DataFrame with the defined schema
                  df = spark.read.option('multiline', 'true') \
                            .option('header', 'true') \
                            .schema(schema).json(file_path)
        else:
             df = spark.read.option('multiline', 'true') \
                            .option('header', 'true') \
                            .option('inferSchema', 'true').json(file_path)

        return df
    except Exception as e:
        return None

In [28]:
def csv_file_loader(path, schema, dynamic_schema):
    try:
        file_path = f'{BASE_DIR}/{path}'
        if dynamic_schema:
                  # Load JSON file into DataFrame with the defined schema
                  df = spark.read \
                            .option('header', 'true') \
                            .schema(schema).csv(file_path)
        else:
             df = spark.read \
                            .option('header', 'true') \
                            .option('inferSchema', 'true').csv(file_path)

        return df
    except Exception as e:
        return None


In [26]:
def load_data_from_file(filename, schema, dynamic_schema):
        file_type = filename.split('.')[1]
        if file_type == 'json':
           return json_file_loader(filename, schema, dynamic_schema)
        elif file_type == 'csv':
          return csv_file_loader(filename, schema, dynamic_schema)
        else:
          return None


In [29]:
for file in customer_files:
   df = load_data_from_file(file, customers_schema, False)
   if df:
      print(df.show(5))
   break

+--------------------+-----------+----------+--------+------------------+--------------+-------------------+------------------+
|          Created At|Customer ID|Is Blocked|Language|Last Used Platform|Loyalty Points|Number of employees|Outstanding Amount|
+--------------------+-----------+----------+--------+------------------+--------------+-------------------+------------------+
|2021-03-15T17:13:...|    3144837|         0|      en|               WEB|             0|               NULL|                 0|
|2021-03-20T14:15:...|    3174590|         0|      en|               WEB|             0|               NULL|                 0|
|2021-03-21T15:36:...|    3181998|         0|      en|               WEB|             0|               NULL|                 0|
|2021-03-23T08:54:...|    3191244|         0|      en|               WEB|           367|               NULL|                 0|
|2021-04-06T13:52:...|    3274222|         0|      en|               WEB|             0|               N

In [None]:
# Stop SparkSession
spark.stop()