#Transforming Raw Data into Actionable Business Insights

In [None]:
import sys
!pip install numpy
!pip install plotly
!pip install datapane
!pip install pyspark
!pip install boto3  # AWS SDK to interact with S3
!pip install psycopg2

In [None]:
import sys
import numpy as np
import plotly.express as px
import plotly.graph_objects as go
import warnings
import os
from pyspark.sql import SparkSession

# Start SparkSession
spark = SparkSession.builder.appName("EDM_Final_Project").getOrCreate()

warnings.filterwarnings('ignore')

In [None]:
# 1. Load Relational Data from Amazon S3

s3_bucket = "s3a://mys3bucket/data/customer.csv"
df_s3 = spark.read.csv(s3_bucket, header=True, inferSchema=True)

# Show data loaded from S3
df_s3.show()

In [None]:
# 2. Loading Data from Amazon Redshift
# Redshift JDBC connection
redshift_jdbc_url = "jdbc:redshift://redshift-cluster-1.abc123xyz.us-east-1.redshift.amazonaws.com:5439/db01"
redshift_properties = {
    "user": "arbazmd",
    "password": "redshift123$",
    "driver": "com.amazon.redshift.jdbc42.Driver"  # Redshift JDBC driver
}

# Querying data from Redshift
df_redshift = spark.read.jdbc(url=redshift_jdbc_url, table="fact_sales_monthly", properties=redshift_properties)

# Showing data loaded from Redshift
df_redshift.show()

Database version: 8.0.33 


In [None]:
# Example function to run SQL queries on Redshift
def run_query(query: str):
    return spark.read.jdbc(url=redshift_jdbc_url, table=f"({query}) as tmp", properties=redshift_properties)

def run_command(command: str):
    spark.sql(command)

In [None]:
# Example function to show available tables in Redshift
def show_redshift_tables():
    query = """
        SELECT table_name
        FROM information_schema.tables
        WHERE table_schema = 'public' AND table_type = 'BASE TABLE';
    """
    return run_query(query)

# Example of counting rows in a Redshift table
def get_table_row_count(tablename: str):
    query = f"SELECT COUNT(1) FROM {tablename};"
    df = run_query(query)
    return df.collect()[0][0]

# Show tables in Redshift
tables = show_redshift_tables()
tables = tables.withColumn("row_count", [get_table_row_count(t['table_name']) for t in tables.collect()])
tables.show()

Unnamed: 0,TABLE_NAME,row_count
0,dim_customer,209
1,dim_market,27
2,dim_product,397
3,fact_forecast_monthly,1885941
4,fact_sales_monthly,1425706
5,freight_cost,135
6,gross_price,1197
7,manufacturing_cost,1197
8,post_invoice_deductions,2063076
9,pre_invoice_deductions,1045


#Building a dashboard to get business insights

Bar Chart - Total Sales by Product Category:

In [None]:

Total_Sales_by_Product_Category_query = """
SELECT category,
       SUM(sold_quantity) AS total_sold_quantity,
       SUM(sold_quantity * gross_price) AS total_revenue
FROM gdb041.fact_sales_monthly F
JOIN gdb041.gross_price G ON F.product_code = G.product_code
AND YEAR(F.date) = G.fiscal_year
GROUP BY category;
"""

total_sales_df = spark.sql(Total_Sales_by_Product_Category_query)

total_sales_df.show()

In [None]:
total_sales_plot = px.bar(total_sales_df, x='category', y='total_revenue', color = 'category', title='Total Sales by Product Category')
total_sales_plot.show()

Line Chart - Monthly Sales Trends for a Specific Product:

In [None]:
Monthly_Sales_Trends_for_a_Specific_Product_query="""
SELECT date, SUM(sold_quantity) AS monthly_sales
FROM fact_sales_monthly
WHERE product_code = 'A0418150101'
GROUP BY date
ORDER BY date;
"""
sales_trends_df=run_spark.sqlquery(Monthly_Sales_Trends_for_a_Specific_Product_query)
sales_trends_df.show()

In [None]:
monthly_tales_trends = px.line(sales_trends_df, x='date', y='monthly_sales', title='Monthly Sales Trends for a Specific Product')
monthly_tales_trends.show()

Pie Chart - Market Share of Top Products:

In [None]:
Market_Share_of_Top_Products_query="""
WITH top_products AS (
  SELECT product_code, SUM(sold_quantity) AS total_sold
  FROM fact_sales_monthly
  GROUP BY product_code
  ORDER BY total_sold DESC
  LIMIT 5
)
SELECT tp.product_code, tp.total_sold, SUM(total_sold) OVER () AS total_sales
FROM top_products tp;
"""
market_share_df=spark.sql(Market_Share_of_Top_Products_query)
market_share_df.show()

In [None]:
market_share = px.pie(market_share_df, names='product_code', values='total_sold', title='Market Share of Top Products')
market_share.show()

Pie chart - Top 3 Customers by Purchases:

In [None]:
Top_3_Customers_by_Purchases_query="""
SELECT customer_code, customer_name, SUM(sold_quantity) AS total_purchases
FROM gdb041.fact_sales_monthly
WHERE YEAR(date) = YEAR('2020-01-01 00:00:00')
GROUP BY customer_code, customer_name
ORDER BY total_purchases DESC
LIMIT 3;
"""

top_purchases_df=spark.sql(Top_3_Customers_by_Purchases_query)

top_purchases_df.show()

In [None]:
top_3_purchases = px.pie(top_purchases_df, names='customer_name', values='total_purchases', title='Top 3 Customers by Purchases')
top_3_purchases.show()

Time Series - Sales Trends Over Time:

In [None]:
Sales_Trends_Over_Time_query="""
SELECT date, SUM(sold_quantity) AS total_quantity_sold
FROM fact_sales_monthly
GROUP BY date
ORDER BY date;
"""

sales_df=spark.sql(Sales_Trends_Over_Time_query)

sales_df.show()

In [None]:
sales_trend = px.line(sales_df, x='date', y='total_quantity_sold', title='Sales Trends Over Time')
sales_trend.show()

Bar Plot - Total Number of Products sold by Product Category:

In [None]:
Total_Number_of_Products_sold_by_Product_Category_query="""
SELECT category,
       SUM(sold_quantity) AS total_sold_quantity,
       SUM(sold_quantity * gross_price) AS total_revenue
FROM gdb041.fact_sales_monthly F
JOIN gdb041.gross_price G ON F.product_code = G.product_code
AND YEAR(F.date) = G.fiscal_year
GROUP BY category;
"""

total_df=spark.sql(Total_Number_of_Products_sold_by_Product_Category_query)
total_df.show()

In [None]:
total_sold_plot = px.bar(total_df, x='category', y='total_sold_quantity', color = 'category', title='Distribution of Manufacturing Costs by Product')
total_sold_plot.show()

In [None]:
import altair as alt
import datapane as dp
dp.enable_logging()



Putting it into a Datapane report

In [None]:
customers = spark.sql("SELECT COUNT(*) AS row_count FROM dim_customer").collect()[0]['row_count']
markets = spark.sql("SELECT COUNT(*) AS row_count FROM dim_market").collect()[0]['row_count']
products = spark.sql("SELECT COUNT(*) AS row_count FROM dim_product").collect()[0]['row_count']
post_invoice_deductions = spark.sql("SELECT COUNT(*) AS row_count FROM post_invoice_deductions").collect()[0]['row_count']

In [None]:
# Creating the Datapane report with the visualizations and data
r = dp.View(
    dp.Group(
        dp.BigNumber(heading="Total customers", value=customers),
        dp.BigNumber(heading="Total markets", value=markets),
        dp.BigNumber(heading="Total products sold", value=products),
        columns=3,
        name="Little_group",
    ),
    dp.Plot(total_sales_plot, name="total_sales"),           # Ensure this is a pandas DataFrame or Altair chart
    dp.Plot(monthly_tales_trends, name="monthly_tales_trends"), # Ensure this is a pandas DataFrame or Altair chart
    dp.Group(dp.Plot(market_share), dp.Plot(sales_trend), dp.Plot(top_3_purchases), dp.Plot(total_sold_plot), columns=2),
)

# Saving the report as an HTML file
dp.save_report(r, "report.html", open=True)

[[34m08:30:35[0m] [[36mDEBUG[0m] <View version="1" fragment="false">
  <Group name="Little_group" columns="3" valign="top">
    <BigNumber heading="Total customers" value="209" is_positive_intent="false" is_upward_change="false"/>
    <BigNumber heading="Total markets" value="27" is_positive_intent="false" is_upward_change="false"/>
    <BigNumber heading="Total products sold" value="397" is_positive_intent="false" is_upward_change="false"/>
  </Group>
  <Plot type="application/vnd.plotly.v1+json" name="total_sales" responsive="true" scale="1.0" src="ref://327e8c233a"/>
  <Plot type="application/vnd.plotly.v1+json" name="monthly_tales_trends" responsive="true" scale="1.0" src="ref://a7e457a162"/>
  <Group columns="2" valign="top">
    <Plot type="application/vnd.plotly.v1+json" responsive="true" scale="1.0" src="ref://e90514c238"/>
    <Plot type="application/vnd.plotly.v1+json" responsive="true" scale="1.0" src="ref://09ebbb4dda"/>
    <Plot type="application/vnd.plotly.v1+json" r

App saved to ./report.html

In [None]:
import IPython
IPython.display.HTML(filename='report.html')

In [None]:
from google.colab import files

# Downloading the HTML file
files.download('report.html')

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

In [3]:
from google.colab import files
!jupyter nbconvert --to html EDM_Pyspark_Project.ipynb
files.download('EDM_Pyspark_Project.html')

[NbConvertApp] Converting notebook EDM_Pyspark_Project.ipynb to html
[NbConvertApp] Writing 838895 bytes to EDM_Pyspark_Project.html


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>