In [1]:
import os
import sys

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import warnings
%matplotlib inline

In [2]:
warnings.filterwarnings("ignore")

In [3]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit, desc, col, size, array_contains, isnan, udf, hour, array_min, array_max, countDistinct
from pyspark.sql.types import IntegerType, StringType, ArrayType, DoubleType, FloatType

In [4]:
# Initialize a spark session.
conf = pyspark.SparkConf().setMaster("local[*]")

def init_spark():
    spark = SparkSession.builder.appName("Pyspark guide").config(conf=conf).getOrCreate()
    return spark

spark = init_spark()

In [5]:
spark

In [6]:
input_path = "./data/Sales_Data.csv"

In [7]:
sales_data = spark.read.csv(input_path, header=True)

In [8]:
print("Data Schema")
sales_data.printSchema

Data Schema


<bound method DataFrame.printSchema of DataFrame[_c0: string, Order ID: string, Product: string, Quantity Ordered: string, Price Each: string, Order Date: string, Purchase Address: string, Month: string, Sales: string, City: string, Hour: string]>

In [9]:
print('Columns overview')
pd.DataFrame(sales_data.dtypes, columns = ['Column Name','Data type'])

Columns overview


Unnamed: 0,Column Name,Data type
0,_c0,string
1,Order ID,string
2,Product,string
3,Quantity Ordered,string
4,Price Each,string
5,Order Date,string
6,Purchase Address,string
7,Month,string
8,Sales,string
9,City,string


In [10]:
sales_data = sales_data.select("Order ID", "Product", "Quantity Ordered", "Price Each", "Order Date", "Purchase Address", \
                              "Month", "Sales", "City", "Hour")

In [11]:
print('Data frame describe (string and numeric columns only):')
sales_data.describe().toPandas()

Data frame describe (string and numeric columns only):


24/07/14 19:26:28 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

Unnamed: 0,summary,Order ID,Product,Quantity Ordered,Price Each,Order Date,Purchase Address,Month,Sales,City,Hour
0,count,185950.0,185950,185950.0,185950.0,185950,185950,185950.0,185950.0,185950,185950.0
1,mean,230417.5693788653,,1.1243828986286637,184.39973476748676,,,7.059139553643453,185.49091675188888,,14.413304651788115
2,stddev,51512.73710999645,,0.4427926240286694,332.7313298843438,,,3.5029960006289547,332.9197713864798,,5.423415962073379
3,min,141234.0,20in Monitor,1.0,109.99,2019-01-01 03:07:00,"1 11th St, Atlanta, GA 30301",1.0,109.99,Atlanta,0.0
4,max,319670.0,iPhone,9.0,999.99,2020-01-01 05:13:00,"999 Wilson St, San Francisco, CA 94016",9.0,999.99,Seattle,9.0


In [12]:
print(f'There are total {sales_data.count()} row, Let print first 2 data rows:')
sales_data.limit(2).toPandas()

There are total 185950 row, Let print first 2 data rows:


Unnamed: 0,Order ID,Product,Quantity Ordered,Price Each,Order Date,Purchase Address,Month,Sales,City,Hour
0,295665,Macbook Pro Laptop,1,1700.0,2019-12-30 00:01:00,"136 Church St, New York City, NY 10001",12,1700.0,New York City,0
1,295666,LG Washing Machine,1,600.0,2019-12-29 07:03:00,"562 2nd St, New York City, NY 10001",12,600.0,New York City,7


In [13]:
pd.DataFrame(sales_data.dtypes, columns = ['Column Name','Data type'])

Unnamed: 0,Column Name,Data type
0,Order ID,string
1,Product,string
2,Quantity Ordered,string
3,Price Each,string
4,Order Date,string
5,Purchase Address,string
6,Month,string
7,Sales,string
8,City,string
9,Hour,string


In [14]:
string_columns = []
numeric_columns = []
array_columns = []

for field in sales_data.schema.fields:
    if isinstance(field.dataType, StringType):
        string_columns.append(field.name)
    elif isinstance(field.dataType, (IntegerType, DoubleType, FloatType)):
        numeric_columns.append(field.name)
    elif isinstance(field.dataType, ArrayType):
        array_columns.append(field.name)

In [15]:
@udf(IntegerType())
def count_missing(column, column_type):
    if column_type == "string":
        return 1 if column is None or column == "" else 0
    elif column_type == "numeric":
        return 1 if column == 0 or column is None or np.isnan(column) else 0
    elif column_type == "array":
        return sum(1 for elem in column if elem == 0 or np.isnan(elem))
    return 0

In [16]:
missing_values = {}

for column in sales_data.columns:
    if column in string_columns:
        column_type = "string"
    elif column in numeric_columns:
        column_type = "numeric"
    elif column in array_columns:
        column_type = "array"
    else:
        continue
    
    missing_count = sales_data.select(count_missing(col(column), lit(column_type)).alias('missing')).agg({"missing": "sum"}).collect()[0][0]
    missing_values[column] = missing_count

missing_df = pd.DataFrame([missing_values])

                                                                                

In [17]:
missing_df

Unnamed: 0,Order ID,Product,Quantity Ordered,Price Each,Order Date,Purchase Address,Month,Sales,City,Hour
0,0,0,0,0,0,0,0,0,0,0


In [18]:
sales_data = sales_data.select(['Order ID', 'Product', 'Quantity Ordered', 'Price Each', 'Order Date', 'Month', 'City', 'Hour'])

In [19]:
distinct_products = sales_data.groupby(sales_data.Product).count().orderBy("count", acsending = False).collect()

In [20]:
product_list = [{row['Product'] : row['count']} for row in distinct_products]
print(product_list)

[{'LG Dryer': 646}, {'LG Washing Machine': 666}, {'Vareebadd Phone': 2065}, {'20in Monitor': 4101}, {'ThinkPad Laptop': 4128}, {'Macbook Pro Laptop': 4724}, {'Flatscreen TV': 4800}, {'Google Phone': 5525}, {'34in Ultrawide Monitor': 6181}, {'27in 4K Gaming Monitor': 6230}, {'iPhone': 6842}, {'27in FHD Monitor': 7507}, {'Bose SoundSport Headphones': 13325}, {'Apple Airpods Headphones': 15549}, {'Wired Headphones': 18882}, {'AA Batteries (4-pack)': 20577}, {'AAA Batteries (4-pack)': 20641}, {'Lightning Charging Cable': 21658}, {'USB-C Charging Cable': 21903}]


In [46]:
sales_data.toPandas()

Unnamed: 0,Order ID,Product,Quantity Ordered,Price Each,Order Date,Month,City,Hour
0,295665,Macbook Pro Laptop,1,1700.0,2019-12-30 00:01:00,12,New York City,0
1,295666,LG Washing Machine,1,600.0,2019-12-29 07:03:00,12,New York City,7
2,295667,USB-C Charging Cable,1,11.95,2019-12-12 18:21:00,12,New York City,18
3,295668,27in FHD Monitor,1,149.99,2019-12-22 15:13:00,12,San Francisco,15
4,295669,USB-C Charging Cable,1,11.95,2019-12-18 12:38:00,12,Atlanta,12
...,...,...,...,...,...,...,...,...
185945,222905,AAA Batteries (4-pack),1,2.99,2019-06-07 19:02:00,6,Boston,19
185946,222906,27in FHD Monitor,1,149.99,2019-06-01 19:29:00,6,New York City,19
185947,222907,USB-C Charging Cable,1,11.95,2019-06-22 18:57:00,6,San Francisco,18
185948,222908,USB-C Charging Cable,1,11.95,2019-06-26 18:35:00,6,San Francisco,18
