##### Load (from AWS-S3 Bucket ) and Transform Data By Using Pyspark Dataframe 

#### Import Libraries

In [0]:
!pip install faker

Collecting faker
  Downloading Faker-19.13.0-py3-none-any.whl (1.7 MB)
[?25l[K     |▏                               | 10 kB 5.9 MB/s eta 0:00:01[K     |▍                               | 20 kB 8.7 MB/s eta 0:00:01[K     |▋                               | 30 kB 10.5 MB/s eta 0:00:01[K     |▊                               | 40 kB 5.1 MB/s eta 0:00:01[K     |█                               | 51 kB 5.8 MB/s eta 0:00:01[K     |█▏                              | 61 kB 6.7 MB/s eta 0:00:01[K     |█▎                              | 71 kB 5.0 MB/s eta 0:00:01[K     |█▌                              | 81 kB 5.6 MB/s eta 0:00:01[K     |█▊                              | 92 kB 6.2 MB/s eta 0:00:01[K     |█▉                              | 102 kB 6.5 MB/s eta 0:00:01[K     |██                              | 112 kB 6.5 MB/s eta 0:00:01[K     |██▎                             | 122 kB 6.5 MB/s eta 0:00:01[K     |██▌                             | 133 kB 6.5 MB/s eta 0:00:01[K  

In [0]:
from pyspark.sql import SparkSession ,SQLContext
import os
from faker import Faker
from pathlib import Path
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql import Window
# import configparser
#from dotenv import load_dotenv

##### Read from config file

In [0]:
ACCESS_KEY = "Your Access Key"
SECRET_KEY = "Your Secret Key"
#there is an error pop while you are puting jar packages therefore, you need to ensure again your hadoop home
#if you run spark locally>> os.environ['HADOOP_HOME']="C:\\winutils" 

#### Create spark session & Setup the configuration

In [0]:
def create_spark_session():
    spark= SparkSession.builder.\
                config('spark.master','local').\
                config('spark.app.name','s3_app').\
                getOrCreate()
    return spark

#### Setup Hadoop-AWS configrution to make pyspark able to access s3 bucket

In [0]:
def configure_hadoop_aws():
    spark=create_spark_session()
    spark._jsc.hadoopConfiguration().set("fs.s3a.access.key", ACCESS_KEY)
    spark._jsc.hadoopConfiguration().set("fs.s3a.secret.key", SECRET_KEY)
    spark._jsc.hadoopConfiguration().set("fs.AbstractFileSystem.s3a.impl","org.apache.hadoop.fs.s3a.S3A")
    return spark

In [0]:
spark=configure_hadoop_aws()
spark

In [0]:
# get all files in bucket
s3_df=spark.read.csv("s3a://sprints-bucket-for-data-engineer-project/",header=True,inferSchema=True)
s3_df.count()

Out[55]: 67502223

In [0]:
#get sample of data which describe e-commerce customer behavior on Oct and Nov
Oct_sample = spark.read.csv("s3a://sprints-bucket-for-data-engineer-project/2019-Oct-EC-customer-behaviour_85385155-bde1-405e-972b-6031808edb4a.csv",header=True,inferSchema=True)
Nov_sample = spark.read.csv("s3a://sprints-bucket-for-data-engineer-project/2019-Nov-EC-customer-behaviour_00c9adb4-00f0-4c71-b0b0-94ade210d073.csv",header=True,inferSchema=True)
sample_df = Oct_sample.union(Nov_sample)

In [0]:
# #write sample raw data in a table 
# sample_df.write.mode("overwrite").saveAsTable("ecommerce_customer_behavior_Oct_Nov")

#### Data Exploration and Transformation


In [0]:
#data type for each column
sample_df.dtypes

Out[10]: [('event_time', 'timestamp'),
 ('event_type', 'string'),
 ('product_id', 'int'),
 ('category_id', 'bigint'),
 ('category_code', 'string'),
 ('brand', 'string'),
 ('price', 'double'),
 ('user_id', 'int'),
 ('user_session', 'string')]

In [0]:
#Count number of nulls for each column
sample_df.select([count(when(col(c).isNull(), c)).alias(c) for c in sample_df.columns]).show()

+----------+----------+----------+-----------+-------------+------+-----+-------+------------+
|event_time|event_type|product_id|category_id|category_code| brand|price|user_id|user_session|
+----------+----------+----------+-----------+-------------+------+-----+-------+------------+
|         0|         0|         0|          0|      1850906|749212|    0|      0|           0|
+----------+----------+----------+-----------+-------------+------+-----+-------+------------+



In [0]:
sample_df.describe().show()

+-------+----------+--------------------+--------------------+-------------------+-------+------------------+-------------------+--------------------+
|summary|event_type|          product_id|         category_id|      category_code|  brand|             price|            user_id|        user_session|
+-------+----------+--------------------+--------------------+-------------------+-------+------------------+-------------------+--------------------+
|  count|   5500000|             5500000|             5500000|            3649094|4750788|           5500000|            5500000|             5500000|
|   mean|      null| 1.102137204313491E7|2.057778111049706...|               null|    NaN|291.07515557269386|5.351721185402945E8|                null|
| stddev|      null|1.2274869995597541E7|1.931905211657705...|               null|    NaN|  356.843136453366|2.022588010267833E7|                null|
|    min|      cart|             1000365| 2053013552226107603|    accessories.bag| a-case|    

In [0]:
sample_df.summary().select(["summary","price"]).show()

+-------+------------------+
|summary|             price|
+-------+------------------+
|  count|           5500000|
|   mean|291.07515557269386|
| stddev|  356.843136453366|
|    min|               0.0|
|    25%|             64.35|
|    50%|            160.36|
|    75%|            361.39|
|    max|           2574.07|
+-------+------------------+



In [0]:
sample_df.count()

Out[14]: 5500000

In [0]:
# drop duplicates rows
sample_df=sample_df.distinct()
sample_df.count()

Out[15]: 5497952

In [0]:
sample_df.select(count(when((col('category_code').isNull() & col('brand').isNull()).alias('cnt'),'cnt'))).show()

+----------------------------------------------------------------------------------+
|count(CASE WHEN ((category_code IS NULL) AND (brand IS NULL)) AS cnt THEN cnt END)|
+----------------------------------------------------------------------------------+
|                                                                            448049|
+----------------------------------------------------------------------------------+



##### Handling Null Values

In [0]:
sample_df=sample_df.fillna("non-brand",subset=['brand'])

In [0]:
# Make sure that I fill all null brands to non-brand
sample_df.select(count(when(col('brand').isNull(), 'brand')).alias('brand') ).show()

+-----+
|brand|
+-----+
|    0|
+-----+



In [0]:
# if there are no null brands the result will be zero
sample_df.select(count(when((col('category_code').isNull() & col('brand').isNull()).alias('cnt'),'cnt'))).show()

+----------------------------------------------------------------------------------+
|count(CASE WHEN ((category_code IS NULL) AND (brand IS NULL)) AS cnt THEN cnt END)|
+----------------------------------------------------------------------------------+
|                                                                                 0|
+----------------------------------------------------------------------------------+



In [0]:
sample_df=sample_df.fillna("misc-categories.misc-products",subset=['category_code'])

In [0]:
# Make sure that I fill all null categories which refers also to products to misc
sample_df.select(count(when(col('category_code').isNull(), 'category_code')).alias('category_code') ).show()

+-------------+
|category_code|
+-------------+
|            0|
+-------------+



In [0]:
#Count number of nulls for each column
sample_df.select([count(when(col(c).isNull(), c)).alias(c) for c in sample_df.columns]).show()

+----------+----------+----------+-----------+-------------+-----+-----+-------+------------+
|event_time|event_type|product_id|category_id|category_code|brand|price|user_id|user_session|
+----------+----------+----------+-----------+-------------+-----+-----+-------+------------+
|         0|         0|         0|          0|            0|    0|    0|      0|           0|
+----------+----------+----------+-----------+-------------+-----+-----+-------+------------+



#### Data Preparation 

In [0]:
# create two new columns one for category name and the other for product name by splitting category_code record only first occurance
sample_df=sample_df.withColumn('category_name',udf(lambda x :str(x).split('.',1)[0])('category_code'))
sample_df=sample_df.withColumn('product_name',udf(lambda x :str(x).split('.',1)[1])('category_code'))

In [0]:
##Use window function dense_rank to generate new ids 
#lit() function is used to add constant as a new column to the DataFrame.
sample_df=sample_df.withColumn('order', row_number().over(Window.partitionBy(lit('1')).orderBy(lit('1'))))\
.withColumn('product_id', dense_rank().over(Window.partitionBy().orderBy('product_name'))).orderBy('order')\
.withColumn('category_id', dense_rank().over(Window.partitionBy().orderBy('category_name'))).orderBy('order')\
.withColumn('event_id', dense_rank().over(Window.partitionBy().orderBy('event_type'))).orderBy('order')\
.withColumn('brand_id', dense_rank().over(Window.partitionBy().orderBy('brand'))).orderBy('order')\
.drop('order')
sample_df.show()

+-------------------+----------+----------+-----------+--------------------+----------+------+---------+--------------------+---------------+--------------------+--------+--------+
|         event_time|event_type|product_id|category_id|       category_code|     brand| price|  user_id|        user_session|  category_name|        product_name|event_id|brand_id|
+-------------------+----------+----------+-----------+--------------------+----------+------+---------+--------------------+---------------+--------------------+--------+--------+
|2019-10-03 03:24:52|      view|       102|          8|electronics.smart...|   samsung|253.02|552407273|b034e464-5fb3-457...|    electronics|          smartphone|       3|    2592|
|2019-10-03 03:25:04|      view|        26|          8|  electronics.clocks|    xiaomi|166.03|525487404|3a251387-ced0-497...|    electronics|              clocks|       3|    3239|
|2019-10-03 03:25:28|      view|        13|          8|electronics.audio...| non-brand|676.98|5

In [0]:
# extract date from timestamp & create date calender
sample_df=sample_df.withColumn('date',to_date('event_time'))
sample_df=sample_df.withColumn('year',year(sample_df['event_time']))\
.withColumn('quarter',quarter(sample_df['event_time']))\
.withColumn('month',month(sample_df['event_time']))\
.withColumn('day',dayofmonth(sample_df['event_time']))

###### Calculate User Spending Time Per Product (low level)

In [0]:
sample_df.groupBy(['user_id','user_session','product_name']).agg(count('*').alias("max_pro_n"))\
.filter(col('max_pro_n') >1 ).show()

+---------+--------------------+--------------------+---------+
|  user_id|        user_session|        product_name|max_pro_n|
+---------+--------------------+--------------------+---------+
|515153260|44bf041d-a936-4eb...|       misc-products|       29|
|514110098|778f5ab8-6fd9-4e5...|          smartphone|       11|
|514756936|c59ae8e8-f678-4c1...|          smartphone|        3|
|540642289|ca77e650-0da3-439...|            video.tv|        6|
|525780252|0d82d7c4-2f99-429...|          smartphone|        8|
|514949941|717ba713-3498-44b...|kitchen.refrigera...|        2|
|516002162|6c97702a-3a9e-4a0...|          smartphone|       13|
|545535009|d7341e2e-f2da-466...|       misc-products|        4|
|554403700|faa50fd2-d1b2-4bd...|          smartphone|        6|
|514151488|3c7b31f3-1fd3-4a7...|       misc-products|       21|
|515371454|0c5fd4d4-eb31-4b2...|kitchen.meat_grinder|       12|
|556229624|2aabf009-1e4f-4d3...|          smartphone|        3|
|556219802|abe53c0c-dc31-4b5...|        

In [0]:
maximum_spendtime=sample_df.groupBy(['user_id','user_session','product_name']).agg(max('event_time').alias("max_event_time"))
minimum_spendtime=sample_df.groupBy(['user_id','user_session','product_name']).agg(min('event_time').alias("min_event_time"))

In [0]:
spenttime_df = maximum_spendtime.join(minimum_spendtime,on=['user_id','user_session','product_name'],how='inner')
spenttime_df

Out[28]: DataFrame[user_id: int, user_session: string, product_name: string, max_event_time: timestamp, min_event_time: timestamp]

In [0]:
spenttime_df.count()

Out[29]: 1474625

In [0]:
spenttime_df.withColumn('spent_time(sec)',col("max_event_time").cast("long")-col("min_event_time").cast("long")).show()

+---------+--------------------+------------------+-------------------+-------------------+---------------+
|  user_id|        user_session|      product_name|     max_event_time|     min_event_time|spent_time(sec)|
+---------+--------------------+------------------+-------------------+-------------------+---------------+
|226242984|9c65f95d-2b03-4f9...|   audio.headphone|2019-11-04 12:32:55|2019-11-04 12:32:55|              0|
|249217260|5fa0e6be-3523-431...|     misc-products|2019-11-05 00:02:03|2019-11-05 00:02:03|              0|
|280194708|4c51d9d1-8000-405...|          notebook|2019-11-06 15:23:55|2019-11-06 15:23:02|             53|
|295005705|241bb568-3ec9-42c...|     misc-products|2019-10-03 04:56:19|2019-10-03 04:52:46|            213|
|311301257|f87aa8c1-ad90-40d...|environment.vacuum|2019-10-03 12:12:13|2019-10-03 12:12:13|              0|
|335961922|b0142bec-ff7b-499...|     misc-products|2019-11-06 15:34:32|2019-11-06 15:34:32|              0|
|339186405|15197c7e-aba0-43b

In [0]:
# calculate user spent time on product 
spenttime_df=spenttime_df.withColumn('spent_time(sec)',col("max_event_time").cast("long")-col("min_event_time").cast("long"))

In [0]:
#join the result with whole date set
sample_df=sample_df.join(spenttime_df, on =['user_id','user_session','product_name'], how ='inner')

#### Build a Data Model

#### Create Customers Table

##### Create fake data for customers information

In [0]:
customers_df=sample_df[['user_id']].distinct().sort(col("user_id").asc())

In [0]:
customers_df.count()

Out[34]: 710043

In [0]:
fake = Faker()
fake_names = [fake.name() for _ in range(0,710043)]

In [0]:
fake_address = [fake.address() for _ in range(0,710043)]

In [0]:
fake_email = [fake.email() for _ in range(0,710043)]

In [0]:
customers_df.columns

Out[38]: ['user_id']

In [0]:
window = Window.orderBy("user_id")
customers_df = customers_df.withColumn("customer_name",udf(lambda x: fake_names[x-1])(row_number().over(window))).withColumn("address",udf(lambda x: fake_address[x-1])(row_number().over(window)))\
.withColumn("email",udf(lambda x: fake_email[x-1])(row_number().over(window)))
customers_df.show()

+---------+----------------+--------------------+--------------------+
|  user_id|   customer_name|             address|               email|
+---------+----------------+--------------------+--------------------+
| 10300217| Jordan Gonzalez|43090 Miller Ridg...|  hclark@example.net|
|161080209|   Douglas Walsh|775 Nathan Mounta...|joannchaney@examp...|
|170180460|      Roy Bailey|4875 Todd Crossin...|rcampbell@example...|
|208701646|    Andrea Sloan|3902 Deborah Park...|  eric65@example.org|
|222349603|   Tammy Johnson|1665 Shaw Through...|ejohnson@example.net|
|226242984|   Donna Kennedy|3470 Kayla Spur S...|henrymartin@examp...|
|239846716|     James Olson|PSC 7516, Box 742...|michaelrobinson@e...|
|249217260|   Ashley Monroe|392 King Way\nWil...|deborah94@example...|
|253299396|  Lindsey Guzman|157 Young Corners...|rodgersmatthew@ex...|
|255931440|     Kevin Smith|4963 Charles Cour...| david95@example.org|
|265601964|      Gary Perez|1318 Karina Mall\...|chadsellers@examp...|
|27281

In [0]:
customers_df.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("customers")

#### Create Products Table

In [0]:
products_df=sample_df[['product_id','product_name']].distinct().sort(col("product_id").asc())

In [0]:
products_df.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("products")

#### Create Categories Table

In [0]:
categories_df=sample_df[['category_id','category_name']].distinct().sort(col("category_id").asc())

In [0]:
categories_df.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("categories")

#### Create Brand Table

In [0]:
brands_df=sample_df[['brand_id','brand']].distinct().sort(col("brand_id").asc())
brands_df=brands_df.withColumnRenamed('brand','brand_name')
brands_df.show()

+--------+-----------+
|brand_id| brand_name|
+--------+-----------+
|       1|     a-case|
|       2|    a-derma|
|       3|    a-elita|
|       4|     a-mega|
|       5|   aardwolf|
|       6|        abk|
|       7|     abtoys|
|       8|     accord|
|       9| accumaster|
|      10|        acd|
|      11|    acebeam|
|      12|       acer|
|      13|       aces|
|      14|   achilles|
|      15|        acm|
|      16|       acme|
|      17|      acqua|
|      18|      acron|
|      19|     action|
|      20|actiontrack|
+--------+-----------+
only showing top 20 rows



In [0]:
brands_df.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("brands")

#### Create Event Types Table

In [0]:
event_types_df=sample_df[['event_id','event_type']].distinct().sort(col("event_id").asc())
event_types_df=event_types_df.withColumnRenamed('event_type','event_name')

In [0]:
event_types_df.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("event_types")

#### Create Date Dim Table

In [0]:
date_dim_df=sample_df[['date','year','quarter','month','day']].distinct().sort(col("date").asc())

In [0]:
date_dim_df.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("date_dim")

#### Transactions Fact Table

In [0]:
transactions_df=sample_df.withColumn('transaction_id', row_number().over(Window.partitionBy(lit('1')).orderBy(lit('1'))))\
.drop("product_name","category_name","event_type","brand","category_code","year","quarter","month","day")


In [0]:
transactions_df=transactions_df.withColumnRenamed("spent_time(sec)","spent_time_sec")
transactions_df=transactions_df.sort(col("transaction_id").asc())

In [0]:
transactions_df.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("transactions_fact")