In [None]:
!pip install matplotlib seaborn plotly pandas pyspark

In [None]:
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DateType
from pyspark.sql.functions import month, quarter, year, sum as _sum, count

import warnings
warnings.simplefilter(action='ignore', category=FutureWarning)

In [None]:
def create_visualizations(sales_df_pd, menu_df_pd, aggregate_df_pd):
    # Convert 'price' in menu_df from string to numeric
    menu_df_pd['price'] = pd.to_numeric(menu_df_pd['price'], errors='coerce')

    # Merge sales_df with menu_df to associate each sale with its price
    sales_with_prices_df = pd.merge(sales_df_pd, menu_df_pd, on='product_id')
    
    # Calculate the total amount of sales in each month
    monthly_sales = sales_with_prices_df.groupby('order_month')['price'].sum().reset_index()

    # Calculate how many times each product was purchased
    product_purchase_count = sales_df_pd.groupby('product_id').size().reset_index(name='product_count')
    product_purchase_count = product_purchase_count.merge(menu_df_pd[['product_id', 'product_name']], on='product_id')

    # Calculate yearly sales
    yearly_sales = sales_with_prices_df.groupby('order_year')['price'].sum().reset_index()

    # Calculate quarterly sales
    quarterly_sales = sales_with_prices_df.groupby('order_quarter')['price'].sum().reset_index()

    # Set the style for seaborn
    sns.set_style("whitegrid")

    # Initialize the matplotlib figure
    fig, axes = plt.subplots(3, 3, figsize=(20, 15))  # Adjust the size as needed
    
    # Plot 1: Total amount spent by each customer
    sns.barplot(x='customer_id', y='total_spent', data=aggregate_df_pd, ax=axes[0, 0], palette='viridis')
    axes[0, 0].set_title('Total amount spent by each customer')

    # Plot 2: Yearly sales
    sns.lineplot(data=yearly_sales, x='order_year', y='price', ax=axes[0, 1], marker='o')
    axes[0, 1].set_title('Yearly Sales')

    # Plot 3: Quarterly sales
    quarterly_sales['quarter'] = 'Q' + quarterly_sales['order_quarter'].astype(str)
    axes[0, 2].pie(quarterly_sales['price'], labels=quarterly_sales['quarter'], autopct='%1.1f%%', startangle=140)
    axes[0, 2].set_title('Quarterly Sales')

    # Plot 4: Total amount of sales in each month
    sns.barplot(x='order_month', y='price', data=monthly_sales, ax=axes[1, 0], palette='deep')
    axes[1, 0].set_title('Total amount of sales in each month')

    # Plot 5: How many times each product purchased
    sns.barplot(x='product_name', y='product_count', data=product_purchase_count, ax=axes[1, 1], palette='muted')
    axes[1, 1].set_title('How many times each product purchased')

    # Plot 6: Top 5 ordered items
    top_products = sales_with_prices_df.groupby('product_name')['price'].sum().nlargest(5).reset_index()
    sns.barplot(x='price', y='product_name', data=top_products, ax=axes[1, 2], palette='Set2')
    axes[1, 2].set_title('Top 5 ordered items')

    # Plot 7: Total sales by order source
    sales_by_source = sales_with_prices_df.groupby('source_order')['price'].sum().reset_index()
    axes[2, 0].pie(sales_by_source['price'], labels=sales_by_source['source_order'], autopct='%1.1f%%', startangle=90)
    axes[2, 0].set_title('Total sales by order source')

    # Plot 8: Total sales by each country
    sales_by_country = sales_with_prices_df.groupby('location')['price'].sum().reset_index()
    sns.barplot(x='price', y='location', data=sales_by_country, ax=axes[2, 1], palette='rocket', orient='h')
    axes[2, 1].set_title('Total Sales by Each Country')

    # Adjust the layout
    plt.tight_layout()

    # Save the plot to a file
    plt.savefig('./sales_dashboard.png')

    # Show the plots
    plt.show()

    return './sales_dashboard.png'

# You would call this function with your actual dataframes like this:
# html_path = create_visualizations(sales_df_pd, menu_df_pd, aggregate_df_pd)
# print(f"Dashboard HTML saved at: {html_path}")


In [None]:

def create_spark_session():
    """Create and return a Spark session."""
    spark = SparkSession.builder \
        .appName("Databricks Refactor") \
        .getOrCreate()
    return spark

def read_data(spark, file_path, schema):
    """Read data from CSV file and return a DataFrame."""
    return spark.read.csv(path=file_path, schema=schema, header=True, inferSchema=True)

def enrich_sales_data(df):
    """Add year, month, and quarter columns to the sales DataFrame."""
    df = df.withColumn("order_year", year("order_date")) \
           .withColumn("order_month", month("order_date")) \
           .withColumn("order_quarter", quarter("order_date"))
    return df

def create_sales_df(spark):
    """Create and return the sales DataFrame."""
    sales_schema = StructType([
        StructField("product_id", IntegerType(), True),
        StructField("customer_id", StringType(), True),
        StructField("order_date", DateType(), True),
        StructField("location", StringType(), True),
        StructField("source_order", StringType(), True)
    ])
    sales_df = read_data(spark, "sales.csv.txt", sales_schema)
    sales_df = enrich_sales_data(sales_df)
    return sales_df

def create_menu_df(spark):
    """Create and return the menu DataFrame."""
    menu_schema = StructType([
        StructField("product_id", IntegerType(), True),
        StructField("product_name", StringType(), True),
        StructField("price", StringType(), True)  # Assuming price is a string that needs conversion
    ])
    menu_df = read_data(spark, "menu.csv.txt", menu_schema)
    return menu_df

def aggregate_data(sales_df, menu_df):
    """Perform various aggregations and display results."""
    # Join sales and menu DataFrames
    joined_df = sales_df.join(menu_df, "product_id")

    # Total amount spent by each customer
    total_amount_spent = joined_df.groupBy("customer_id").agg(_sum("price").alias("total_spent"))
    return total_amount_spent

    # And similarly for other aggregations...

def main():
    spark = create_spark_session()

    sales_df = create_sales_df(spark)
    menu_df = create_menu_df(spark)


    aggregate_df=aggregate_data(sales_df, menu_df)

   #Convert to PandasDF for plotting

    sales_df_pd = sales_df.toPandas()
    menu_df_pd = menu_df.toPandas()
    aggregate_df_pd = aggregate_df.toPandas()

    print(aggregate_df_pd.columns)
    
    #Vizaulization on pllotly and matplotlib, seaborn, sns
    # Call the function with your Pandas DataFrames
    html_path = create_visualizations(sales_df_pd, menu_df_pd, aggregate_df_pd)
    print(f"Dashboard HTML saved at: {html_path}")    
    spark.stop()

if __name__ == "__main__":
    main()
