# Transform Normalized Relational Table for Analysis in Redshift

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession \
        .builder \
        .appName("ETL Globalmart") \
        .getOrCreate()

In [1]:
import psycopg2
import boto3
import pyspark
from glob import glob
from pyspark.sql import SparkSession

# Classes and Functions

In [4]:
class GlobalMartDB:
    
    def __init__(self, connection_params):
        self.connection_params = connection_params
        
    def connect(self):
        """ Connect to the PostgreSQL database server """
        try:
            connection = psycopg2.connect(**self.connection_params)
        except (Exception, psycopg2.DatabaseError) as error:
            print(error)
        print("Connection successful")
        self.connection = connection
        return self
    
    def execute(self, sql):
        with self.connection as conn:
            cursor = conn.cursor()
            cursor.execute(sql)
            results = cursor.fetchall()
            return results
        
    def spark_df_builder(self, table, columns):
        sql = f'select * from {table};'
        results = self.execute(sql)
        return spark.createDataFrame(data=results, schema=columns)




In [5]:
def aggregated_df_builder(sql_statements):
    dfs = []
    for statement in sql_statements.values():
        dfs.append(spark.sql(statement))
    return dfs

In [6]:
def spark_reduce(df_list):
    iterator = iter(df_list)
    df = next(iterator)
    for next_df in iterator:
        df = df.join(next_df, on=['customer_id'], how='inner')
    return df

In [7]:
def upload_parquet_files(bucket, prefix, parquet_dir):
    for file in glob(f'{parquet_dir}/*'):
        s3.upload_file(
            Filename = file,
            Bucket = bucket ,
            Key = f'{prefix}/{file}')

In [8]:
rds_connection = {
    'host': 'globalmart.cb5gjaknyj4i.us-east-1.rds.amazonaws.com',
    'database': 'globalmart',
    'user': 'postgres',
    'password': 'password'
}

# RDS Tables

In [21]:
tables = {
    'customers': [
        "customer_id",
        "customer_email",
        "customer_name",
        "segment",
        "country",
        "city",
        "state",
        "postal_code",
        "region",
    ],
    
    'vendors': ["vendor_id", "vendor_name"],
    
    'orders': [
        "order_id",
        "customer_id",
        "vendor_id",
        "ship_mode",
        "order_status",
        "order_purchase_date",
        "order_approved_at",
        "order_delivered_carrier_date",
        "order_delivered_customer_date",
        "order_estimated_delivery_date",
    ],
    
    'returns': ["order_id", "return_reason"],
    
    'transactions': [
        "transaction_id",
        "order_id",
        "ship_date",
        "ship_mode",
        "product_id",
        "category",
        "sub_category",
        "product_name",
        "sales",
        "quantity",
        "discount",
        "profit",
    ],
}

# Transformations

In [22]:
sql_statements = {
    'total_orders': """
                    select customer_id,
                    count(*) as orders
                    from orders
                    group by customer_id;
                    """,
    'total_spent': """
                    with order_totals as(
                    select order_id,
                    sum(sales) as total
                    from transactions
                    group by order_id
                    order by total desc
                    )
                    select customer_id,
                    round(cast(sum(order_totals.total) as decimal(10,2)), 2) as grand_total
                    from orders
                    join order_totals on order_totals.order_id = orders.order_id
                    group by customer_id;
                    """,
    'purchase_frequency': """
                    select customer_id,
                    cast(extract(day from (max(order_purchase_date) - min(order_purchase_date)) / count(order_purchase_date)) as integer) as avg_purchase_freq_days
                    from orders
                    group by customer_id;
                    """,
    'total_returns': """
                    with rtns as(
                    select order_id,
                    count(*) as no_returns
                    from returns
                    group by order_id
                    )
                    select customer_id,
                    sum(rtns.no_returns) as total_returns
                    from orders
                    join rtns on rtns.order_id = orders.order_id
                    group by orders.customer_id;
                    """,
}

In [23]:
db = GlobalMartDB(rds_connection).connect()

Connection successful


# Build Dataframes

In [24]:
table_dfs = {}
for table, columns in tables.items():
    # create a dict of df
    table_dfs[table] = db.spark_df_builder(table, columns)
    # make a view with the name of each of the tables
    table_dfs[table].createOrReplaceTempView(table)

In [25]:
spark.sql('''
with order_totals as(
select order_id,
sum(sales) as total
from transactions
group by order_id
order by total desc
)

select customer_id,
round(cast(sum(order_totals.total) as decimal(10,2)), 2) as grand_total
from orders
join order_totals on order_totals.order_id = orders.order_id
group by customer_id
order by grand_total desc;
''').show(truncate=False)

21/08/29 15:40:50 WARN TaskSetManager: Stage 16 contains a task of very large size (3497 KiB). The maximum recommended task size is 1000 KiB.
21/08/29 15:40:50 WARN TaskSetManager: Stage 17 contains a task of very large size (10419 KiB). The maximum recommended task size is 1000 KiB.

+-----------+-----------+
|customer_id|grand_total|
+-----------+-----------+
|101247     |36214.51   |
|107311     |36117.05   |
|111063     |36046.41   |
|130695     |35918.45   |
|120879     |35918.45   |
|126943     |35865.05   |
|117127     |35865.05   |
|113375     |34765.84   |
|103559     |34765.84   |
|123191     |34446.88   |
|133007     |34446.88   |
|112807     |33369.18   |
|102991     |33369.18   |
|122623     |33345.30   |
|132439     |33345.30   |
|106743     |33148.28   |
|126375     |33112.04   |
|116559     |33112.04   |
|102119     |32875.21   |
|105871     |32819.64   |
+-----------+-----------+
only showing top 20 rows




                                                                                

# Aggregate and View Final Fact Dataframe

In [26]:
agg_dfs = aggregated_df_builder(sql_statements)

In [27]:
fact_table = spark_reduce(agg_dfs)

In [38]:
fact_table.show()

21/08/29 16:03:41 WARN TaskSetManager: Stage 44 contains a task of very large size (3497 KiB). The maximum recommended task size is 1000 KiB.
21/08/29 16:03:41 WARN TaskSetManager: Stage 47 contains a task of very large size (3497 KiB). The maximum recommended task size is 1000 KiB.
21/08/29 16:03:44 WARN TaskSetManager: Stage 48 contains a task of very large size (3497 KiB). The maximum recommended task size is 1000 KiB.
21/08/29 16:03:44 WARN TaskSetManager: Stage 49 contains a task of very large size (10419 KiB). The maximum recommended task size is 1000 KiB.

+-----------+------+-----------+----------------------+-------------+
|customer_id|orders|grand_total|avg_purchase_freq_days|total_returns|
+-----------+------+-----------+----------------------+-------------+
|     100031|    16|    6391.12|                    27|            1|
|     100501|    16|   11838.55|                    35|            1|
|     100565|    16|    9052.58|                    32|            1|
|     100847|    16|    6209.11|                    27|            1|
|     100852|    16|    5033.11|                    29|            1|
|     101011|    16|    7020.04|                    32|            1|
|     101067|    16|    5588.62|                    31|            1|
|     101155|    16|    6663.44|                    27|            1|
|     101221|    16|    5278.70|                    34|            1|
|     101333|    16|    8461.47|                    35|            1|
|     101496|    16|    7124.85|                    34|            1|
|     101507|    16|



                                                                                

In [39]:
fact_table.printSchema()

root
 |-- customer_id: long (nullable = true)
 |-- orders: long (nullable = false)
 |-- grand_total: decimal(10,2) (nullable = true)
 |-- avg_purchase_freq_days: integer (nullable = true)
 |-- total_returns: long (nullable = true)



# Save Dataframe as Parquet and Upload to S3

In [40]:
fact_table.write.mode('overwrite').parquet('customer_fact')

21/08/29 16:04:03 WARN TaskSetManager: Stage 52 contains a task of very large size (3497 KiB). The maximum recommended task size is 1000 KiB.
21/08/29 16:04:04 WARN TaskSetManager: Stage 55 contains a task of very large size (3497 KiB). The maximum recommended task size is 1000 KiB.
21/08/29 16:04:07 WARN TaskSetManager: Stage 56 contains a task of very large size (3497 KiB). The maximum recommended task size is 1000 KiB.
21/08/29 16:04:07 WARN TaskSetManager: Stage 57 contains a task of very large size (10419 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

In [41]:
s3 = boto3.client('s3', aws_access_key_id='',
                      aws_secret_access_key='')
bucket = 'on-premise-dump-files'

In [42]:
upload_parquet_files(bucket, prefix='transformed/parquet', parquet_dir='customer_fact')