# Question 2


## Getting the data from the csv file

In [3]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql import functions

spark = SparkSession.builder.appName("Datacamp Pyspark Tutorial").config("spark.memory.offHeap.enabled","true").config("spark.memory.offHeap.size","10g").getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/09/06 13:12:42 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [35]:
spark.sparkContext.setLogLevel("ERROR")

In [43]:
df = spark.read.csv('data.csv',header=True)

In [45]:
df.show(5,0)

+---------+---------+-----------------------------------+--------+--------------+---------+----------+--------------+
|InvoiceNo|StockCode|Description                        |Quantity|InvoiceDate   |UnitPrice|CustomerID|Country       |
+---------+---------+-----------------------------------+--------+--------------+---------+----------+--------------+
|536365   |85123A   |WHITE HANGING HEART T-LIGHT HOLDER |6       |12/1/2010 8:26|2.55     |17850     |United Kingdom|
|536365   |71053    |WHITE METAL LANTERN                |6       |12/1/2010 8:26|3.39     |17850     |United Kingdom|
|536365   |84406B   |CREAM CUPID HEARTS COAT HANGER     |8       |12/1/2010 8:26|2.75     |17850     |United Kingdom|
|536365   |84029G   |KNITTED UNION FLAG HOT WATER BOTTLE|6       |12/1/2010 8:26|3.39     |17850     |United Kingdom|
|536365   |84029E   |RED WOOLLY HOTTIE WHITE HEART.     |6       |12/1/2010 8:26|3.39     |17850     |United Kingdom|
+---------+---------+-----------------------------------

## Cleaning the Data

In [57]:
df.printSchema()

root
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: string (nullable = true)
 |-- InvoiceDate: string (nullable = true)
 |-- UnitPrice: string (nullable = true)
 |-- CustomerID: string (nullable = true)
 |-- Country: string (nullable = true)



In [59]:
# Correct data types
# Converting InvoiceDate from String to Datetime, Quantity and UnitPrice to Interger
spark.sql("set spark.sql.legacy.timeParserPolicy=LEGACY")
df2 = df.withColumn("Quantity",col("Quantity").cast(IntegerType())) \
    .withColumn("UnitPrice",col("UnitPrice").cast(IntegerType())) \
    .withColumn('InvoiceDate',to_timestamp("InvoiceDate", 'MM/dd/YY HH:mm'))

# I am not filtering out rows with null values for the CustomerId as they could represent actual Sales but for non registered customers.

df2.printSchema()

df2.show(5,0)

root
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDate: timestamp (nullable = true)
 |-- UnitPrice: integer (nullable = true)
 |-- CustomerID: string (nullable = true)
 |-- Country: string (nullable = true)

+---------+---------+-----------------------------------+--------+-------------------+---------+----------+--------------+
|InvoiceNo|StockCode|Description                        |Quantity|InvoiceDate        |UnitPrice|CustomerID|Country       |
+---------+---------+-----------------------------------+--------+-------------------+---------+----------+--------------+
|536365   |85123A   |WHITE HANGING HEART T-LIGHT HOLDER |6       |2009-12-27 08:26:00|2        |17850     |United Kingdom|
|536365   |71053    |WHITE METAL LANTERN                |6       |2009-12-27 08:26:00|3        |17850     |United Kingdom|
|536365   |84406B   |CREAM CUPID HEARTS COAT

## Sales and orders per day

In [91]:
df2 = df2.withColumn('Sales', col("Quantity") * col("UnitPrice")) \
    .withColumn("InvoiceDay",col("InvoiceDate").cast(DateType()))

df2.groupBy('InvoiceDay').agg(sum('Sales').alias('Sales'),countDistinct('InvoiceNo').alias('Orders')).show()




+----------+-------+------+
|InvoiceDay|  Sales|Orders|
+----------+-------+------+
|2009-12-27| 568018|  2025|
|2010-12-26|6491074| 23875|
+----------+-------+------+



                                                                                

## Top 10 Products for each month

In [98]:
df2.show(5)

+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+-----+----------+----+-----+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|Sales|InvoiceDay|year|month|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+-----+----------+----+-----+
|   536365|   85123A|WHITE HANGING HEA...|       6|2009-12-27 08:26:00|        2|     17850|United Kingdom|   12|2009-12-27|2009|   12|
|   536365|    71053| WHITE METAL LANTERN|       6|2009-12-27 08:26:00|        3|     17850|United Kingdom|   18|2009-12-27|2009|   12|
|   536365|   84406B|CREAM CUPID HEART...|       8|2009-12-27 08:26:00|        2|     17850|United Kingdom|   16|2009-12-27|2009|   12|
|   536365|   84029G|KNITTED UNION FLA...|       6|2009-12-27 08:26:00|        3|     17850|United Kingdom|   18|2009-12-27|2009|   12|
|   536365|   84029E|RED WOOLLY HOTTIE...|      

In [116]:
from pyspark.sql import Window
from pyspark.sql.functions import row_number

df2 = df2.withColumn('year', year(col("InvoiceDate"))) \
    .withColumn('month', month(col("InvoiceDate")))

monthly_sales = df2.groupBy("year","month","StockCode","Description").agg(sum("Sales").alias("monthlySales"))
w = Window.partitionBy("year","month").orderBy(col("monthlySales").desc())
r = monthly_sales.withColumn("rank",row_number().over(w))

top10 = r.filter(col("rank") <= 10)
top10.show(50)



+----+-----+---------+--------------------+------------+----+
|year|month|StockCode|         Description|monthlySales|rank|
+----+-----+---------+--------------------+------------+----+
|2009|   12|    22423|REGENCY CAKESTAND...|       25237|   1|
|2009|   12|      DOT|      DOTCOM POSTAGE|       24641|   2|
|2009|   12|   84029E|RED WOOLLY HOTTIE...|        8140|   3|
|2009|   12|    22114|HOT WATER BOTTLE ...|        7492|   4|
|2009|   12|    21479|WHITE SKULL HOT W...|        7487|   5|
|2009|   12|    22086|PAPER CHAIN KIT 5...|        7348|   6|
|2009|   12|   85123A|WHITE HANGING HEA...|        6871|   7|
|2009|   12|    79321|       CHILLI LIGHTS|        6790|   8|
|2009|   12|    22112|CHOCOLATE HOT WAT...|        6608|   9|
|2009|   12|    21623|VINTAGE UNION JAC...|        6567|  10|
|2010|   12|      DOT|      DOTCOM POSTAGE|      181255|   1|
|2010|   12|    22423|REGENCY CAKESTAND...|      128147|   2|
|2010|   12|    47566|       PARTY BUNTING|       86471|   3|
|2010|  

                                                                                

## Writing the DataFrame in the db

In [None]:
# Writing back to a MySQL database
url = "jdbc:mysql://localhost:3306/tempdb"
properties = {
    "user": "tmpuser",
    "password": "yd51Tq1@eK",
    "driver": "com.mysql.jdbc.Driver"
}

table_name = "Sales"
df = spark.read.jdbc(url=url, table=table_name, properties=properties)

mode = "overwrite"
df.write.jdbc(url=url, table=table_name, mode=mode, properties=properties)