In [0]:
df = spark.read.csv("dbfs:/FileStore/shared_uploads/ashish258kothari@gmail.com/sales_data_.csv", header = True)

In [0]:
df.show(4)

+--------+--------------------+----------------+----------+--------------+--------------------+
|Order ID|             Product|Quantity Ordered|Price Each|    Order Date|    Purchase Address|
+--------+--------------------+----------------+----------+--------------+--------------------+
|  295665|  Macbook Pro Laptop|               1|      1700| 12/30/19 0:01|136 Church St, Ne...|
|  295666|  LG Washing Machine|               1|       600| 12/29/19 7:03|562 2nd St, New Y...|
|  295667|USB-C Charging Cable|               1|     11.95|12/12/19 18:21|277 Main St, New ...|
|  295668|    27in FHD Monitor|               1|    149.99|12/22/19 15:13|410 6th St, San F...|
+--------+--------------------+----------------+----------+--------------+--------------------+
only showing top 4 rows



In [0]:
df.describe().show()

+-------+-----------------+------------+------------------+------------------+--------------+--------------------+
|summary|         Order ID|     Product|  Quantity Ordered|        Price Each|    Order Date|    Purchase Address|
+-------+-----------------+------------+------------------+------------------+--------------+--------------------+
|  count|           186850|      186850|            186850|            186850|        186850|              186850|
|   mean|230417.5693788653|        null|1.1243828986286637|184.39973476749515|          null|                null|
| stddev|51512.73710999598|        null|  0.44279262402867|332.73132988434395|          null|                null|
|    min|           141234|20in Monitor|                 1|            109.99|01/01/19 10:02|1 11th St, Atlant...|
|    max|             null|        null|              null|              null|          null|                null|
+-------+-----------------+------------+------------------+------------------+--

In [0]:
df.printSchema()

root
 |-- Order ID: string (nullable = true)
 |-- Product: string (nullable = true)
 |-- Quantity Ordered: string (nullable = true)
 |-- Price Each: string (nullable = true)
 |-- Order Date: string (nullable = true)
 |-- Purchase Address: string (nullable = true)



Spark => Dataframe
Spark Functions => to clean
Spark => SQL(to clean and transform)
Spark SQL => component pyspark => we can use sql queries to interact with data


-- sql => Dataframe => SQL table store => Database
-- Database => Tables.  (already created dataframe)

# Creating Database for SQL queries

In [0]:
spark.sql("drop database if exists sales_db cascade")

Out[10]: DataFrame[]

In [0]:
spark.sql("create database if not exists sales_db")
spark.sql("use sales_db")

Out[11]: DataFrame[]

## Using the same database for all transactions

In [0]:
spark.sql("""  create table if not exists sales(
            OrderId string,
            Product string,
            QuantityOrdered string,
            PriceEach string,
            OrderDate string,
            PurchaseAddress string
)""")

Out[14]: DataFrame[]

In [0]:
spark.sql("select * from sales").show()

+-------+-------+---------------+---------+---------+---------------+
|OrderId|Product|QuantityOrdered|PriceEach|OrderDate|PurchaseAddress|
+-------+-------+---------------+---------+---------+---------------+
+-------+-------+---------------+---------+---------+---------------+



view => virtual table (store the table query)

In [0]:
df.show(2)

+--------+------------------+----------------+----------+-------------+--------------------+
|Order ID|           Product|Quantity Ordered|Price Each|   Order Date|    Purchase Address|
+--------+------------------+----------------+----------+-------------+--------------------+
|  295665|Macbook Pro Laptop|               1|      1700|12/30/19 0:01|136 Church St, Ne...|
|  295666|LG Washing Machine|               1|       600|12/29/19 7:03|562 2nd St, New Y...|
+--------+------------------+----------------+----------+-------------+--------------------+
only showing top 2 rows



In [0]:
df.createOrReplaceTempView("temp_sales")

In [0]:
spark.sql("select * from temp_sales").show(4)

+--------+--------------------+----------------+----------+--------------+--------------------+
|Order ID|             Product|Quantity Ordered|Price Each|    Order Date|    Purchase Address|
+--------+--------------------+----------------+----------+--------------+--------------------+
|  295665|  Macbook Pro Laptop|               1|      1700| 12/30/19 0:01|136 Church St, Ne...|
|  295666|  LG Washing Machine|               1|       600| 12/29/19 7:03|562 2nd St, New Y...|
|  295667|USB-C Charging Cable|               1|     11.95|12/12/19 18:21|277 Main St, New ...|
|  295668|    27in FHD Monitor|               1|    149.99|12/22/19 15:13|410 6th St, San F...|
+--------+--------------------+----------------+----------+--------------+--------------------+
only showing top 4 rows



## Inserting record using the subquery - into sales_raw table

In [0]:
spark.sql(""" insert overwrite sales
              select * from temp_sales """)

Out[19]: DataFrame[num_affected_rows: bigint, num_inserted_rows: bigint]

In [0]:
spark.sql("select * from sales").show(2)

+-------+------------------+---------------+---------+-------------+--------------------+
|OrderId|           Product|QuantityOrdered|PriceEach|    OrderDate|     PurchaseAddress|
+-------+------------------+---------------+---------+-------------+--------------------+
| 295665|Macbook Pro Laptop|              1|     1700|12/30/19 0:01|136 Church St, Ne...|
| 295666|LG Washing Machine|              1|      600|12/29/19 7:03|562 2nd St, New Y...|
+-------+------------------+---------------+---------+-------------+--------------------+
only showing top 2 rows



In [0]:
%sql

select * from sales where orderId != 'null'

OrderId,Product,QuantityOrdered,PriceEach,OrderDate,PurchaseAddress
207486,Lightning Charging Cable,1,14.95,05/10/19 0:24,"88 Lincoln St, Austin, TX 73301"
207487,AA Batteries (4-pack),3,3.84,05/24/19 15:13,"522 Walnut St, Seattle, WA 98101"
207488,Google Phone,1,600,05/31/19 14:30,"83 12th St, San Francisco, CA 94016"
207488,USB-C Charging Cable,1,11.95,05/31/19 14:30,"83 12th St, San Francisco, CA 94016"
207489,USB-C Charging Cable,1,11.95,05/24/19 8:16,"105 Hill St, San Francisco, CA 94016"
207490,Lightning Charging Cable,2,14.95,05/18/19 8:03,"260 2nd St, Dallas, TX 75001"
207491,27in FHD Monitor,1,149.99,05/24/19 12:22,"768 8th St, Portland, OR 97035"
207492,Wired Headphones,2,11.99,05/12/19 2:36,"456 Walnut St, San Francisco, CA 94016"
207493,Wired Headphones,1,11.99,05/08/19 7:25,"220 Highland St, Los Angeles, CA 90001"
207494,USB-C Charging Cable,3,11.95,05/22/19 23:03,"620 10th St, New York City, NY 10001"


# Data Cleansing for the Project

(Bad data) record which are bad for quality analysis - Have a look on the same

In [0]:
%sql

select * from sales 
where orderId = 'null'
limit 5;

OrderId,Product,QuantityOrdered,PriceEach,OrderDate,PurchaseAddress
,,,,,
,,,,,
,,,,,
,,,,,
,,,,,


In [0]:
df.filter( df["Order ID"] != 'null' ).show(3)

+--------+--------------------+----------------+----------+--------------+--------------------+
|Order ID|             Product|Quantity Ordered|Price Each|    Order Date|    Purchase Address|
+--------+--------------------+----------------+----------+--------------+--------------------+
|  295665|  Macbook Pro Laptop|               1|      1700| 12/30/19 0:01|136 Church St, Ne...|
|  295666|  LG Washing Machine|               1|       600| 12/29/19 7:03|562 2nd St, New Y...|
|  295667|USB-C Charging Cable|               1|     11.95|12/12/19 18:21|277 Main St, New ...|
+--------+--------------------+----------------+----------+--------------+--------------------+
only showing top 3 rows



In [0]:
df.filter( df["Order ID"] != 'null' ).describe().show()

+-------+-----------------+------------+------------------+------------------+--------------+--------------------+
|summary|         Order ID|     Product|  Quantity Ordered|        Price Each|    Order Date|    Purchase Address|
+-------+-----------------+------------+------------------+------------------+--------------+--------------------+
|  count|           186305|      186305|            186305|            186305|        186305|              186305|
|   mean|230417.5693788653|        null|1.1243828986286637|184.39973476749515|          null|                null|
| stddev|51512.73710999598|        null|  0.44279262402867|332.73132988434395|          null|                null|
|    min|           141234|20in Monitor|                 1|            109.99|01/01/19 10:02|1 11th St, Atlant...|
|    max|         Order ID|      iPhone|  Quantity Ordered|        Price Each|    Order Date|    Purchase Address|
+-------+-----------------+------------+------------------+------------------+--

In [0]:
%sql

select * from sales 
where orderId = 'Order ID';

OrderId,Product,QuantityOrdered,PriceEach,OrderDate,PurchaseAddress
Order ID,Product,Quantity Ordered,Price Each,Order Date,Purchase Address
Order ID,Product,Quantity Ordered,Price Each,Order Date,Purchase Address
Order ID,Product,Quantity Ordered,Price Each,Order Date,Purchase Address
Order ID,Product,Quantity Ordered,Price Each,Order Date,Purchase Address
Order ID,Product,Quantity Ordered,Price Each,Order Date,Purchase Address
Order ID,Product,Quantity Ordered,Price Each,Order Date,Purchase Address
Order ID,Product,Quantity Ordered,Price Each,Order Date,Purchase Address
Order ID,Product,Quantity Ordered,Price Each,Order Date,Purchase Address
Order ID,Product,Quantity Ordered,Price Each,Order Date,Purchase Address
Order ID,Product,Quantity Ordered,Price Each,Order Date,Purchase Address


# Way to filter bad data [Name value, orderID in data]

In [0]:
%sql

select * from sales 
where orderId != 'Order ID' and orderId != 'null';

OrderId,Product,QuantityOrdered,PriceEach,OrderDate,PurchaseAddress
207486,Lightning Charging Cable,1,14.95,05/10/19 0:24,"88 Lincoln St, Austin, TX 73301"
207487,AA Batteries (4-pack),3,3.84,05/24/19 15:13,"522 Walnut St, Seattle, WA 98101"
207488,Google Phone,1,600.0,05/31/19 14:30,"83 12th St, San Francisco, CA 94016"
207488,USB-C Charging Cable,1,11.95,05/31/19 14:30,"83 12th St, San Francisco, CA 94016"
207489,USB-C Charging Cable,1,11.95,05/24/19 8:16,"105 Hill St, San Francisco, CA 94016"
207490,Lightning Charging Cable,2,14.95,05/18/19 8:03,"260 2nd St, Dallas, TX 75001"
207491,27in FHD Monitor,1,149.99,05/24/19 12:22,"768 8th St, Portland, OR 97035"
207492,Wired Headphones,2,11.99,05/12/19 2:36,"456 Walnut St, San Francisco, CA 94016"
207493,Wired Headphones,1,11.99,05/08/19 7:25,"220 Highland St, Los Angeles, CA 90001"
207494,USB-C Charging Cable,3,11.95,05/22/19 23:03,"620 10th St, New York City, NY 10001"


# Creating temp table to check wheter we get data perfectly or not

In [0]:
%sql

with tmp_sale as (
  select * from sales  
  where orderId != 'null' and orderId != 'Order ID'
)


select * from tmp_sale
where orderId = 'null';

OrderId,Product,QuantityOrdered,PriceEach,OrderDate,PurchaseAddress


## Split and checking the columns - With filter

In [0]:
%sql

select trim(split(PurchaseAddress,',')[1]) as city
        from sales 
          where orderId != 'Order ID' and orderId != 'null';

city
Austin
Seattle
San Francisco
San Francisco
San Francisco
Dallas
Portland
San Francisco
Los Angeles
New York City


In [0]:
%sql

select split(PurchaseAddress,',')[1] as city,
            split(PurchaseAddress,',')[2] as State
        from sales 
          where orderId != 'Order ID' and orderId != 'null';

city,State
Austin,TX 73301
Seattle,WA 98101
San Francisco,CA 94016
San Francisco,CA 94016
San Francisco,CA 94016
Dallas,TX 75001
Portland,OR 97035
San Francisco,CA 94016
Los Angeles,CA 90001
New York City,NY 10001


# Getting and Preparing column as per the requirement

In [0]:
%sql
select 
  cast(OrderId as int) as OrderId,
  product,
  cast(QuantityOrdered as int) as QuantityOrdered,
  cast(PriceEach as int) as PriceEach,
  to_timestamp(OrderDate, 'MM/dd/yy H:mm') as OrderDate,
  PurchaseAddress,
  substr(split(PurchaseAddress, ',')[1],2) as City,
  substr(split(PurchaseAddress, ',')[2],2,2) as State,
  year(to_timestamp(OrderDate, 'MM/dd/yy H:mm')) as ReportYear,
  month(to_timestamp(OrderDate, 'MM/dd/yy H:mm')) as Month
from sales
where orderID != "Order ID" and orderid != "null"
limit 3


OrderId,product,QuantityOrdered,PriceEach,OrderDate,PurchaseAddress,City,State,ReportYear,Month
207486,Lightning Charging Cable,1,14,2019-05-10T00:24:00.000+0000,"88 Lincoln St, Austin, TX 73301",Austin,TX,2019,5
207487,AA Batteries (4-pack),3,3,2019-05-24T15:13:00.000+0000,"522 Walnut St, Seattle, WA 98101",Seattle,WA,2019,5
207488,Google Phone,1,600,2019-05-31T14:30:00.000+0000,"83 12th St, San Francisco, CA 94016",San Francisco,CA,2019,5


In [0]:
from pyspark.sql.functions import col, to_timestamp, year, month, split, substring

df_filtered = df.filter(
    (col("Order ID") != "Order ID") & 
    (col("Order ID") != "null")
)

df_transformed = df_filtered.select(
    col("Order ID").cast("int").alias("OrderId"),
    col("Product"),
    col("Quantity Ordered").cast("int").alias("QuantityOrdered"),
    col("Price Each").cast("int").alias("PriceEach"),
    to_timestamp(col("Order Date"), "MM/dd/yy H:mm").alias("OrderDate"),
    col("Purchase Address"),
    substring(split(col("Purchase Address"), ",")[1], 2, 100).alias("City"),
    substring(split(col("Purchase Address"), ",")[2], 2, 2).alias("State"),
    year(to_timestamp(col("Order Date"), "MM/dd/yy H:mm")).alias("ReportYear"),
    month(to_timestamp(col("Order Date"), "MM/dd/yy H:mm")).alias("Month")
)

df_transformed.show(3, truncate=False)


+-------+--------------------+---------------+---------+-------------------+--------------------------------------+-------------+-----+----------+-----+
|OrderId|Product             |QuantityOrdered|PriceEach|OrderDate          |Purchase Address                      |City         |State|ReportYear|Month|
+-------+--------------------+---------------+---------+-------------------+--------------------------------------+-------------+-----+----------+-----+
|295665 |Macbook Pro Laptop  |1              |1700     |2019-12-30 00:01:00|136 Church St, New York City, NY 10001|New York City|NY   |2019      |12   |
|295666 |LG Washing Machine  |1              |600      |2019-12-29 07:03:00|562 2nd St, New York City, NY 10001   |New York City|NY   |2019      |12   |
|295667 |USB-C Charging Cable|1              |11       |2019-12-12 18:21:00|277 Main St, New York City, NY 10001  |New York City|NY   |2019      |12   |
+-------+--------------------+---------------+---------+-------------------+------