# Source Data Exploration

## Initializing the SparkContext

In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder \
    .appName("Reading source files") \
    .getOrCreate()

24/09/12 15:25:59 WARN Utils: Your hostname, Ankitas-MacBook-Air.local resolves to a loopback address: 127.0.0.1; using 192.168.1.4 instead (on interface en0)
24/09/12 15:25:59 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/09/12 15:25:59 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


## Reading the Source Data

In [26]:
df = spark.read.parquet("source")

### Dataframe operations

In [4]:
# Get the count of the records
print("The count of the records is: ", df.count())

The count of the records is:  68


In [5]:
df.show()

+---+--------------------+---------+------+---------+---------+--------+-------------------+----------------+------------+----------------+
| Op| replicadmstimestamp|invoiceid|itemid| category|    price|quantity|          orderdate|destinationstate|shippingtype|        referral|
+---+--------------------+---------+------+---------+---------+--------+-------------------+----------------+------------+----------------+
|  I|2023-03-26 23:13:...|     9621|    29|Household|46.000000|       1|2016-04-17 05:30:00|              IN|        Free|           Other|
|  I|2023-03-26 23:13:...|     4023|    80|   Office|18.000000|       4|2016-04-01 05:30:00|              ND|       3-Day|           Other|
|  I|2023-03-26 23:13:...|    11369|    56|   Office|37.000000|       1|2016-06-01 05:30:00|              NH|       2-Day| Repeat Customer|
|  I|2023-03-26 23:13:...|    10502|     5|  Kitchen|22.000000|       3|2016-07-02 05:30:00|              MT|       3-Day| Repeat Customer|
|  I|2023-03-26 23:1

In [6]:
df.printSchema()

root
 |-- Op: string (nullable = true)
 |-- replicadmstimestamp: string (nullable = true)
 |-- invoiceid: integer (nullable = true)
 |-- itemid: integer (nullable = true)
 |-- category: string (nullable = true)
 |-- price: decimal(28,6) (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- orderdate: timestamp (nullable = true)
 |-- destinationstate: string (nullable = true)
 |-- shippingtype: string (nullable = true)
 |-- referral: string (nullable = true)



In [11]:
# Get the invoiceid for the records where price > 30 and only get the counts
df.filter("price > 30").select("invoiceid").count()

45

In [17]:
# Group by the category and get the total counts and don't consideer the category with less than 10 records

df.groupBy("category").count().withColumnRenamed("count","total_count").filter("total_count > 10").show()

+---------+-----------+
| category|total_count|
+---------+-----------+
|  Kitchen|         20|
|   Office|         16|
|Household|         16|
|   Garden|         14|
+---------+-----------+



In [37]:
# Ranking the records based on the category and price

from pyspark.sql.window import Window
from pyspark.sql.functions import row_number
from pyspark.sql.functions import desc

#orderby descending order
windowSpec = Window.partitionBy("category").orderBy(desc("price"))

# Add the row_number to the dataframe
df = df.withColumn("row_number", row_number().over(windowSpec))

# Show the category, price and row_number
df.select("category", "price", "row_number").orderBy("category", desc('price')).show()

+---------+---------+----------+
| category|    price|row_number|
+---------+---------+----------+
|   Garden|95.000000|         1|
|   Garden|92.000000|         2|
|   Garden|85.000000|         3|
|   Garden|84.000000|         4|
|   Garden|73.000000|         5|
|   Garden|73.000000|         6|
|   Garden|69.000000|         7|
|   Garden|66.000000|         8|
|   Garden|60.000000|         9|
|   Garden|46.000000|        10|
|   Garden|34.000000|        11|
|   Garden|24.000000|        12|
|   Garden|12.000000|        13|
|   Garden| 9.000000|        14|
|Household|96.000000|         1|
|Household|90.000000|         2|
|Household|88.000000|         3|
|Household|83.000000|         4|
|Household|80.000000|         5|
|Household|68.000000|         6|
+---------+---------+----------+
only showing top 20 rows



### SQL operations

In [38]:
# Get the count by executing a SQL query
df.createOrReplaceTempView("source_data")
sqlDF = spark.sql("SELECT COUNT(*) FROM source_data")
sqlDF.show()

+--------+
|count(1)|
+--------+
|      68|
+--------+



In [39]:
sqlDF = spark.sql("SELECT count(*) FROM source_data where price > 20")
sqlDF.show()

+--------+
|count(1)|
+--------+
|      54|
+--------+



In [41]:
# Get the schema using the SQL query
sqlDF = spark.sql("DESCRIBE source_data")
sqlDF.show()

+-------------------+-------------+-------+
|           col_name|    data_type|comment|
+-------------------+-------------+-------+
|                 Op|       string|   null|
|replicadmstimestamp|       string|   null|
|          invoiceid|          int|   null|
|             itemid|          int|   null|
|           category|       string|   null|
|              price|decimal(28,6)|   null|
|           quantity|          int|   null|
|          orderdate|    timestamp|   null|
|   destinationstate|       string|   null|
|       shippingtype|       string|   null|
|           referral|       string|   null|
|         row_number|          int|   null|
+-------------------+-------------+-------+



## End the session

In [42]:
# Stop the spark session
spark.stop()