Importing required libraries and passing google credentials json as a variable

In [101]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
from pyspark.context import SparkContext
from pyspark.sql.functions import col,isnan,when,year,month
credentials_location = '/home/amanhd9/.google/credentials/google_credentials.json'


Configuring the spark session to connect with GCS and Google BigQuery

In [102]:

conf = SparkConf() \
    .setMaster('local[*]') \
    .setAppName('test') \
    .set("spark.jars", "/home/amanhd9/SPARK/spark-3.3.3-bin-hadoop3/jars/gcs-connector-hadoop3-latest.jar") \
    .set("spark.jars", "/home/amanhd9/SPARK/spark-3.3.3-bin-hadoop3/jars/spark-3.3-bigquery-0.32.2.jar") \
    .set("spark.hadoop.google.cloud.auth.service.account.enable", "true") \
    .set("spark.hadoop.google.cloud.auth.service.account.json.keyfile", credentials_location)
    
sc = SparkContext(conf=conf)

hadoop_conf = sc._jsc.hadoopConfiguration()

hadoop_conf.set("fs.AbstractFileSystem.gs.impl",  "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS")
hadoop_conf.set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
hadoop_conf.set("fs.gs.auth.service.account.json.keyfile", credentials_location)
hadoop_conf.set("fs.gs.auth.service.account.enable", "true")

spark = SparkSession.builder \
    .config(conf=sc.getConf()) \
    .getOrCreate()
    

Reading from Google Big Query Warehouse staging schema using spark.read method

In [None]:
sales_table_name = "data-eng-399920.stg.sales-data"
stores_table_name = "data-eng-399920.stg.stores-data" 

df_sales = spark.read \
    .format("bigquery") \
    .option("table", sales_table_name) \
    .load()
    
df_stores= spark.read \
    .format("bigquery") \
    .option("table", stores_table_name) \
    .load()


Checking count of the sales data frame 

In [103]:
df_sales.count()

19454838

In [104]:
df_sales.show()

+----------+--------+-------------------+-----+-------+-----+-----+------------+-----------+------------+-----------+----------------+---------------------+
|product_id|store_id|               date|sales|revenue|stock|price|promo_type_1|promo_bin_1|promo_type_2|promo_bin_2|promo_discount_2|promo_discount_type_2|
+----------+--------+-------------------+-----+-------+-----+-----+------------+-----------+------------+-----------+----------------+---------------------+
|     P0512|   S0072|2019-10-25 00:00:00|  0.0|    0.0|  4.0| 19.9|        PR10|       high|        PR03|       null|            null|                 null|
|     P0512|   S0073|2019-10-25 00:00:00|  0.0|    0.0|  3.0| 19.9|        PR10|       high|        PR03|       null|            null|                 null|
|     P0512|   S0074|2019-10-25 00:00:00|  0.0|    0.0| 11.0| 19.9|        PR10|       high|        PR03|       null|            null|                 null|
|     P0512|   S0075|2019-10-25 00:00:00|  0.0|    0.0|  6

Having a look at the data frame structure shows columns which are not relavent for sales and revenue insights, thus dropping them.
Also a lot of rows have sales as 0, which is incorrect thus dropping such rows as well

In [105]:
df_sales=df_sales.drop('stock','promo_type_1','promo_bin_1','promo_type_2', \
'promo_bin_2','promo_discount_2','promo_discount_type_2')
df_sales=df_sales.orderBy('date').filter('sales != 0.0')
df_sales.show()
df_sales.count()

                                                                                

+----------+--------+-------------------+-----+-------+-----+
|product_id|store_id|               date|sales|revenue|price|
+----------+--------+-------------------+-----+-------+-----+
|     P0017|   S0025|2017-01-02 00:00:00|  1.0|   1.38| 1.49|
|     P0015|   S0082|2017-01-02 00:00:00|  1.0|   2.41|  2.6|
|     P0017|   S0020|2017-01-02 00:00:00|  3.0|   4.14| 1.49|
|     P0015|   S0001|2017-01-02 00:00:00|  1.0|   2.41|  2.6|
|     P0015|   S0068|2017-01-02 00:00:00|  1.0|   2.41|  2.6|
|     P0016|   S0032|2017-01-02 00:00:00|  1.0|   1.85|  2.0|
|     P0017|   S0015|2017-01-02 00:00:00|  2.0|   2.76| 1.49|
|     P0001|   S0056|2017-01-02 00:00:00|  1.0|    5.3| 6.25|
|     P0004|   S0044|2017-01-02 00:00:00|  1.0|   3.81|  4.5|
|     P0015|   S0032|2017-01-02 00:00:00|  1.0|   2.41|  2.6|
|     P0015|   S0040|2017-01-02 00:00:00|  1.0|   2.41|  2.6|
|     P0015|   S0085|2017-01-02 00:00:00|  1.0|   2.41|  2.6|
|     P0015|   S0130|2017-01-02 00:00:00|  1.0|   2.41|  2.6|
|     P0

3189682

In [106]:
df_stores.show()

+--------+------------+----------+-------+
|store_id|storetype_id|store_size|city_id|
+--------+------------+----------+-------+
|   S0091|        ST04|        19|   C013|
|   S0012|        ST04|        28|   C005|
|   S0045|        ST04|        17|   C008|
|   S0032|        ST03|        14|   C019|
|   S0027|        ST04|        24|   C022|
|   S0088|        ST04|        20|   C009|
|   S0095|        ST02|        44|   C014|
|   S0055|        ST04|        24|   C014|
|   S0099|        ST03|        14|   C014|
|   S0078|        ST04|        19|   C036|
|   S0006|        ST03|         8|   C024|
|   S0135|        ST03|        16|   C035|
|   S0066|        ST04|        47|   C033|
|   S0106|        ST04|        21|   C031|
|   S0112|        ST04|        46|   C031|
|   S0059|        ST03|        15|   C014|
|   S0129|        ST04|        18|   C021|
|   S0116|        ST03|        15|   C031|
|   S0098|        ST03|        15|   C022|
|   S0049|        ST04|        25|   C031|
+--------+-

Having a look at stores data frame shows that joining it with the sales data frame can give us better representation of which store or city is fetching better revenue

In [107]:
df_join = df_sales.join(df_stores,on='store_id',how='outer').orderBy('date','store_id')
df_join.show()

                                                                                

+--------+----------+-------------------+-----+-------+-----+------------+----------+-------+
|store_id|product_id|               date|sales|revenue|price|storetype_id|store_size|city_id|
+--------+----------+-------------------+-----+-------+-----+------------+----------+-------+
|   S0001|     P0499|2017-01-02 00:00:00|  1.0|   0.69| 0.75|        ST04|        41|   C031|
|   S0001|     P0311|2017-01-02 00:00:00|  3.0|    3.0|  1.0|        ST04|        41|   C031|
|   S0001|     P0439|2017-01-02 00:00:00|  3.0|  20.97| 8.25|        ST04|        41|   C031|
|   S0001|     P0148|2017-01-02 00:00:00|  4.0|   21.3| 5.75|        ST04|        41|   C031|
|   S0001|     P0287|2017-01-02 00:00:00|  1.0|   5.56|  7.5|        ST04|        41|   C031|
|   S0001|     P0348|2017-01-02 00:00:00|  2.0|   3.11|  2.1|        ST04|        41|   C031|
|   S0001|     P0438|2017-01-02 00:00:00|  9.0|   2.08| 0.25|        ST04|        41|   C031|
|   S0001|     P0035|2017-01-02 00:00:00|  2.0|   4.54| 2.45

checking for null values

In [109]:
Dict_Null = {col:df_join.filter(df_join[col].isNull()).count() for col in df_join.columns}
Dict_Null

                                                                                

{'store_id': 0,
 'product_id': 0,
 'date': 0,
 'sales': 0,
 'revenue': 0,
 'price': 145892,
 'storetype_id': 0,
 'store_size': 0,
 'city_id': 0}

In [110]:
df_join.filter(col('price').isNull()).show()



+--------+----------+-------------------+-----+-------+-----+------------+----------+-------+
|store_id|product_id|               date|sales|revenue|price|storetype_id|store_size|city_id|
+--------+----------+-------------------+-----+-------+-----+------------+----------+-------+
|   S0028|     P0348|2017-01-02 00:00:00|  3.0|   4.58| null|        ST01|        86|   C036|
|   S0028|     P0177|2017-01-02 00:00:00|  4.0|   6.78| null|        ST01|        86|   C036|
|   S0028|     P0333|2017-01-02 00:00:00|  8.0|  14.81| null|        ST01|        86|   C036|
|   S0028|     P0079|2017-01-02 00:00:00|  1.0|   2.08| null|        ST01|        86|   C036|
|   S0028|     P0131|2017-01-02 00:00:00|  4.0|   8.33| null|        ST01|        86|   C036|
|   S0028|     P0226|2017-01-02 00:00:00|  3.0|   4.17| null|        ST01|        86|   C036|
|   S0028|     P0327|2017-01-02 00:00:00|  1.0|   2.64| null|        ST01|        86|   C036|
|   S0028|     P0018|2017-01-02 00:00:00|  2.0|   3.61| null

                                                                                

Found multiple null values in price column, but that will not put impact on our revenue and sales value as there are separate columns already present for these values

The resulting data frame is written to the Production schema as the revenue-data table,but for easy visualization and comparison between monthly data , Splitting the data frame into monthly data frames

In [111]:
df_join_with_year_month = df_join.withColumn("year", year("date")).withColumn("month", month("date"))
unique_year_month = df_join_with_year_month.select("year", "month").distinct().collect()

                                                                                

In [112]:
year_month_data_frames = {}

for row in unique_year_month:
    year = row.year
    month = row.month
    year_month_data_frames[(year, month)] = \
    df_join_with_year_month.filter((df_join_with_year_month.year == year) \
    & (df_join_with_year_month.month == month))
    
    

year_month_data_frames dictionary will contain data frames of each month, these will come useful when comparing monthly revenue data using Looker studio, the data frames have been written to big query tables using ./Spark/transform_upload.py

In [114]:
spark.stop()