In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType

In [2]:
spark = SparkSession.builder.appName("SalesAnalytics").getOrCreate()

## Data Preparation

In [3]:
schema = StructType([
    StructField("Order ID", StringType(), True),
    StructField("Product", StringType(), True),
    StructField("Qty Ordered", StringType(), True),
    StructField("Price Each", StringType(), True),
    StructField("Order Date", StringType(), True),
    StructField("Purchase Address", StringType(), True)
])

In [4]:
sales_data_fpath = [
    "./data/salesdata/Sales_January_2019.csv",
    "./data/salesdata/Sales_February_2019.csv",
    "./data/salesdata/Sales_March_2019.csv",
    "./data/salesdata/Sales_April_2019.csv",
    "./data/salesdata/Sales_May_2019.csv",
    "./data/salesdata/Sales_June_2019.csv",
    "./data/salesdata/Sales_July_2019.csv",
    "./data/salesdata/Sales_August_2019.csv",
    "./data/salesdata/Sales_September_2019.csv",
    "./data/salesdata/Sales_October_2019.csv",
    "./data/salesdata/Sales_November_2019.csv",
    "./data/salesdata/Sales_December_2019.csv"]
sales_raw_df = spark.read.csv(sales_data_fpath, header = True, schema = schema)

In [5]:
sales_raw_df.show(10)

+--------+--------------------+-----------+----------+--------------+--------------------+
|Order ID|             Product|Qty Ordered|Price Each|    Order Date|    Purchase Address|
+--------+--------------------+-----------+----------+--------------+--------------------+
|  295665|  Macbook Pro Laptop|          1|      1700|12/30/19 00:01|136 Church St, Ne...|
|  295666|  LG Washing Machine|          1|     600.0|12/29/19 07:03|562 2nd St, New Y...|
|  295667|USB-C Charging Cable|          1|     11.95|12/12/19 18:21|277 Main St, New ...|
|  295668|    27in FHD Monitor|          1|    149.99|12/22/19 15:13|410 6th St, San F...|
|  295669|USB-C Charging Cable|          1|     11.95|12/18/19 12:38|43 Hill St, Atlan...|
|  295670|AA Batteries (4-p...|          1|      3.84|12/31/19 22:58|200 Jefferson St,...|
|  295671|USB-C Charging Cable|          1|     11.95|12/16/19 15:10|928 12th St, Port...|
|  295672|USB-C Charging Cable|          2|     11.95|12/13/19 09:29|813 Hickory St, D...|

In [6]:
sales_raw_df.printSchema()

root
 |-- Order ID: string (nullable = true)
 |-- Product: string (nullable = true)
 |-- Qty Ordered: string (nullable = true)
 |-- Price Each: string (nullable = true)
 |-- Order Date: string (nullable = true)
 |-- Purchase Address: string (nullable = true)



## Data Preparation and Cleansing

In [7]:
from pyspark.sql.functions import col

In [8]:
sales_raw_df.filter(col("Order ID").isNull() == True).show(10)

+--------+-------+-----------+----------+----------+----------------+
|Order ID|Product|Qty Ordered|Price Each|Order Date|Purchase Address|
+--------+-------+-----------+----------+----------+----------------+
|    NULL|   NULL|       NULL|      NULL|      NULL|            NULL|
|    NULL|   NULL|       NULL|      NULL|      NULL|            NULL|
|    NULL|   NULL|       NULL|      NULL|      NULL|            NULL|
|    NULL|   NULL|       NULL|      NULL|      NULL|            NULL|
|    NULL|   NULL|       NULL|      NULL|      NULL|            NULL|
|    NULL|   NULL|       NULL|      NULL|      NULL|            NULL|
|    NULL|   NULL|       NULL|      NULL|      NULL|            NULL|
|    NULL|   NULL|       NULL|      NULL|      NULL|            NULL|
|    NULL|   NULL|       NULL|      NULL|      NULL|            NULL|
|    NULL|   NULL|       NULL|      NULL|      NULL|            NULL|
+--------+-------+-----------+----------+----------+----------------+
only showing top 10 

In [9]:
sales_raw_df = sales_raw_df.na.drop("any")

In [10]:
sales_raw_df.filter(col("Order ID").isNull() == True).show(10)

+--------+-------+-----------+----------+----------+----------------+
|Order ID|Product|Qty Ordered|Price Each|Order Date|Purchase Address|
+--------+-------+-----------+----------+----------+----------------+
+--------+-------+-----------+----------+----------+----------------+



In [11]:
sales_raw_df.describe("Order ID","Product","Qty Ordered","Price Each","Order Date","Purchase Address").show()

+-------+------------------+------------+------------------+------------------+--------------+--------------------+
|summary|          Order ID|     Product|       Qty Ordered|        Price Each|    Order Date|    Purchase Address|
+-------+------------------+------------+------------------+------------------+--------------+--------------------+
|  count|            186305|      186305|            186305|            186305|        186305|              186305|
|   mean| 230417.5693788653|        NULL|1.1243828986286637|184.39973476747707|          NULL|                NULL|
| stddev|51512.737109995265|        NULL|0.4427926240286704| 332.7313298843439|          NULL|                NULL|
|    min|            141234|20in Monitor|                 1|            109.99|01/01/19 03:07|1 11th St, Atlant...|
|    max|          Order ID|      iPhone|  Quantity Ordered|        Price Each|    Order Date|    Purchase Address|
+-------+------------------+------------+------------------+------------

In [12]:
from pyspark.sql.functions import split

In [13]:
sales_raw_df.select("Purchase Address").show(10, truncate=False)

+-----------------------------------------+
|Purchase Address                         |
+-----------------------------------------+
|136 Church St, New York City, NY 10001   |
|562 2nd St, New York City, NY 10001      |
|277 Main St, New York City, NY 10001     |
|410 6th St, San Francisco, CA 94016      |
|43 Hill St, Atlanta, GA 30301            |
|200 Jefferson St, New York City, NY 10001|
|928 12th St, Portland, OR 97035          |
|813 Hickory St, Dallas, TX 75001         |
|718 Wilson St, Dallas, TX 75001          |
|77 7th St, Dallas, TX 75001              |
+-----------------------------------------+
only showing top 10 rows



In [14]:
sales_raw_df.select("Purchase Address", split(col("Purchase Address"), ",")).show(10, truncate=False)

+-----------------------------------------+---------------------------------------------+
|Purchase Address                         |split(Purchase Address, ,, -1)               |
+-----------------------------------------+---------------------------------------------+
|136 Church St, New York City, NY 10001   |[136 Church St,  New York City,  NY 10001]   |
|562 2nd St, New York City, NY 10001      |[562 2nd St,  New York City,  NY 10001]      |
|277 Main St, New York City, NY 10001     |[277 Main St,  New York City,  NY 10001]     |
|410 6th St, San Francisco, CA 94016      |[410 6th St,  San Francisco,  CA 94016]      |
|43 Hill St, Atlanta, GA 30301            |[43 Hill St,  Atlanta,  GA 30301]            |
|200 Jefferson St, New York City, NY 10001|[200 Jefferson St,  New York City,  NY 10001]|
|928 12th St, Portland, OR 97035          |[928 12th St,  Portland,  OR 97035]          |
|813 Hickory St, Dallas, TX 75001         |[813 Hickory St,  Dallas,  TX 75001]         |
|718 Wilso

In [15]:
sales_raw_df.select("Purchase Address", split(col("Purchase Address"), ",").getItem(1)).show(10, truncate=False)

+-----------------------------------------+---------------------------------+
|Purchase Address                         |split(Purchase Address, ,, -1)[1]|
+-----------------------------------------+---------------------------------+
|136 Church St, New York City, NY 10001   | New York City                   |
|562 2nd St, New York City, NY 10001      | New York City                   |
|277 Main St, New York City, NY 10001     | New York City                   |
|410 6th St, San Francisco, CA 94016      | San Francisco                   |
|43 Hill St, Atlanta, GA 30301            | Atlanta                         |
|200 Jefferson St, New York City, NY 10001| New York City                   |
|928 12th St, Portland, OR 97035          | Portland                        |
|813 Hickory St, Dallas, TX 75001         | Dallas                          |
|718 Wilson St, Dallas, TX 75001          | Dallas                          |
|77 7th St, Dallas, TX 75001              | Dallas              

In [16]:
sales_raw_df.select("Purchase Address", split(col("Purchase Address"), ",").getItem(2)).show(10, truncate=False)

+-----------------------------------------+---------------------------------+
|Purchase Address                         |split(Purchase Address, ,, -1)[2]|
+-----------------------------------------+---------------------------------+
|136 Church St, New York City, NY 10001   | NY 10001                        |
|562 2nd St, New York City, NY 10001      | NY 10001                        |
|277 Main St, New York City, NY 10001     | NY 10001                        |
|410 6th St, San Francisco, CA 94016      | CA 94016                        |
|43 Hill St, Atlanta, GA 30301            | GA 30301                        |
|200 Jefferson St, New York City, NY 10001| NY 10001                        |
|928 12th St, Portland, OR 97035          | OR 97035                        |
|813 Hickory St, Dallas, TX 75001         | TX 75001                        |
|718 Wilson St, Dallas, TX 75001          | TX 75001                        |
|77 7th St, Dallas, TX 75001              | TX 75001            

In [17]:
sales_raw_df.select("Purchase Address", split(split(col("Purchase Address"), ",").getItem(2), ' ')).show(10, truncate=False)

+-----------------------------------------+-----------------------------------------------+
|Purchase Address                         |split(split(Purchase Address, ,, -1)[2],  , -1)|
+-----------------------------------------+-----------------------------------------------+
|136 Church St, New York City, NY 10001   |[, NY, 10001]                                  |
|562 2nd St, New York City, NY 10001      |[, NY, 10001]                                  |
|277 Main St, New York City, NY 10001     |[, NY, 10001]                                  |
|410 6th St, San Francisco, CA 94016      |[, CA, 94016]                                  |
|43 Hill St, Atlanta, GA 30301            |[, GA, 30301]                                  |
|200 Jefferson St, New York City, NY 10001|[, NY, 10001]                                  |
|928 12th St, Portland, OR 97035          |[, OR, 97035]                                  |
|813 Hickory St, Dallas, TX 75001         |[, TX, 75001]                        

In [18]:
sales_raw_df.select("Purchase Address", split(split(col("Purchase Address"), ",").getItem(2), ' ').getItem(1)).show(10, truncate=False)

+-----------------------------------------+--------------------------------------------------+
|Purchase Address                         |split(split(Purchase Address, ,, -1)[2],  , -1)[1]|
+-----------------------------------------+--------------------------------------------------+
|136 Church St, New York City, NY 10001   |NY                                                |
|562 2nd St, New York City, NY 10001      |NY                                                |
|277 Main St, New York City, NY 10001     |NY                                                |
|410 6th St, San Francisco, CA 94016      |CA                                                |
|43 Hill St, Atlanta, GA 30301            |GA                                                |
|200 Jefferson St, New York City, NY 10001|NY                                                |
|928 12th St, Portland, OR 97035          |OR                                                |
|813 Hickory St, Dallas, TX 75001         |TX     

In [19]:
sales_extractcityandstate_df = (sales_raw_df.withColumn("City", split(col("Purchase Address"), ",").getItem(1))
                                .withColumn("State", split(split(col("Purchase Address"), ",").getItem(2), ' ').getItem(1)))

In [20]:
sales_extractcityandstate_df.show(10, truncate = False)

+--------+--------------------------+-----------+----------+--------------+-----------------------------------------+--------------+-----+
|Order ID|Product                   |Qty Ordered|Price Each|Order Date    |Purchase Address                         |City          |State|
+--------+--------------------------+-----------+----------+--------------+-----------------------------------------+--------------+-----+
|295665  |Macbook Pro Laptop        |1          |1700      |12/30/19 00:01|136 Church St, New York City, NY 10001   | New York City|NY   |
|295666  |LG Washing Machine        |1          |600.0     |12/29/19 07:03|562 2nd St, New York City, NY 10001      | New York City|NY   |
|295667  |USB-C Charging Cable      |1          |11.95     |12/12/19 18:21|277 Main St, New York City, NY 10001     | New York City|NY   |
|295668  |27in FHD Monitor          |1          |149.99    |12/22/19 15:13|410 6th St, San Francisco, CA 94016      | San Francisco|CA   |
|295669  |USB-C Charging Ca

In [23]:
flight_file = './data/flights/flight-summary.csv'
flight_summary_df = (spark.read.format("csv")
                     .option("header", True)
                     .option("inferSchema", True)
                     .load(flight_file))

In [24]:
flight_summary_df.show(10, truncate = False)

+-----------+------------------------------------------------+------------+------------+---------+--------------------------------------------+----------------+----------+-----+
|origin_code|origin_airport                                  |origin_city |origin_state|dest_code|dest_airport                                |dest_city       |dest_state|count|
+-----------+------------------------------------------------+------------+------------+---------+--------------------------------------------+----------------+----------+-----+
|BQN        |Rafael Hernández Airport                        |Aguadilla   |PR          |MCO      |Orlando International Airport               |Orlando         |FL        |441  |
|PHL        |Philadelphia International Airport              |Philadelphia|PA          |MCO      |Orlando International Airport               |Orlando         |FL        |4869 |
|MCI        |Kansas City International Airport               |Kansas City |MO          |IAH      |George Bush 

In [25]:
flight_summary_df.count()

4693

In [27]:
flight_summary_df.printSchema()

root
 |-- origin_code: string (nullable = true)
 |-- origin_airport: string (nullable = true)
 |-- origin_city: string (nullable = true)
 |-- origin_state: string (nullable = true)
 |-- dest_code: string (nullable = true)
 |-- dest_airport: string (nullable = true)
 |-- dest_city: string (nullable = true)
 |-- dest_state: string (nullable = true)
 |-- count: integer (nullable = true)



In [28]:
flight_summary_df = flight_summary_df.withColumnRenamed("count","flight_count")

In [29]:
flight_summary_df.show(10, truncate = False)

+-----------+------------------------------------------------+------------+------------+---------+--------------------------------------------+----------------+----------+------------+
|origin_code|origin_airport                                  |origin_city |origin_state|dest_code|dest_airport                                |dest_city       |dest_state|flight_count|
+-----------+------------------------------------------------+------------+------------+---------+--------------------------------------------+----------------+----------+------------+
|BQN        |Rafael Hernández Airport                        |Aguadilla   |PR          |MCO      |Orlando International Airport               |Orlando         |FL        |441         |
|PHL        |Philadelphia International Airport              |Philadelphia|PA          |MCO      |Orlando International Airport               |Orlando         |FL        |4869        |
|MCI        |Kansas City International Airport               |Kansas City |

In [30]:
from pyspark.sql.functions import count, countDistinct

In [31]:
flight_summary_df.select(countDistinct("origin_airport"), countDistinct("dest_airport"), count("*")).show()

+------------------------------+----------------------------+--------+
|count(DISTINCT origin_airport)|count(DISTINCT dest_airport)|count(1)|
+------------------------------+----------------------------+--------+
|                           322|                         322|    4693|
+------------------------------+----------------------------+--------+



In [32]:
from pyspark.sql.functions import min, max, sum, sumDistinct, avg

In [33]:
flight_summary_df.select(min("flight_count"), max("flight_count")).show()

+-----------------+-----------------+
|min(flight_count)|max(flight_count)|
+-----------------+-----------------+
|                1|            13744|
+-----------------+-----------------+



In [34]:
flight_summary_df.select(sum("flight_count")).show()

+-----------------+
|sum(flight_count)|
+-----------------+
|          5332914|
+-----------------+



In [35]:
flight_summary_df.groupBy("origin_airport").count().orderBy("count", ascending = False).show(10)

+--------------------+-----+
|      origin_airport|count|
+--------------------+-----+
|Hartsfield-Jackso...|  169|
|Chicago O'Hare In...|  162|
|Dallas/Fort Worth...|  148|
|Denver Internatio...|  139|
|Minneapolis-Saint...|  120|
|George Bush Inter...|  119|
|Detroit Metropoli...|  112|
|Salt Lake City In...|   89|
|Newark Liberty In...|   88|
|San Francisco Int...|   80|
+--------------------+-----+
only showing top 10 rows



In [38]:
(flight_summary_df.groupBy("origin_airport").agg(max("flight_count").alias("max_flight_count")).orderBy("max_flight_count", ascending = False)).show(10)

+--------------------+----------------+
|      origin_airport|max_flight_count|
+--------------------+----------------+
|San Francisco Int...|           13744|
|Los Angeles Inter...|           13457|
|John F. Kennedy I...|           12016|
|McCarran Internat...|            9715|
|LaGuardia Airport...|            9639|
|Chicago O'Hare In...|            9575|
|     Kahului Airport|            8313|
|Honolulu Internat...|            8282|
|Hartsfield-Jackso...|            8234|
|Orlando Internati...|            8202|
+--------------------+----------------+
only showing top 10 rows



In [41]:
(flight_summary_df.groupBy("origin_state","origin_city").count().where(col("origin_state")=="CA").orderBy("count",ascending=False)).show(10)

+------------+-------------+-----+
|origin_state|  origin_city|count|
+------------+-------------+-----+
|          CA|San Francisco|   80|
|          CA|  Los Angeles|   80|
|          CA|    San Diego|   47|
|          CA|      Oakland|   35|
|          CA|   Sacramento|   27|
|          CA|     San Jose|   25|
|          CA|    Santa Ana|   22|
|          CA|      Ontario|   14|
|          CA|   Long Beach|   12|
|          CA| Palm Springs|   12|
+------------+-------------+-----+
only showing top 10 rows

