In [182]:
import load_csv
df = load_csv.load("data/clean_data.csv")

In [183]:
from pyspark.sql.functions import countDistinct, col, sum, row_number, when, count, year, month, quarter, round, lag, concat, lit, desc, format_number
from pyspark.sql import Window
from pathlib import Path

In [184]:
import os
from pathlib import Path

# Define the base path for saving files
base_path = "/Users/matthewbernhardt/Desktop/Revature_Project_2/data_analysis_and_cleaner/data/matt"

# Ensure the directory exists
Path(base_path).mkdir(parents=True, exist_ok=True)

# Define a function to save a DataFrame to the specified path
def save_dataframe_to_csv(df, filename):
    file_path = os.path.join(base_path, filename)
    # Convert to Pandas DataFrame and save to CSV file
    df.toPandas().to_csv(file_path, index=False)



In [185]:
df.count()

13523

In [186]:
print(df.columns)

['order_id', 'customer_id', 'customer_name', 'product_id', 'product_name', 'product_category', 'payment_type', 'qty', 'price', 'datetime', 'country', 'city', 'ecommerce_website_name', 'payment_txn_id', 'payment_txn_success']


In [187]:
from pyspark.sql.functions import udf, col
from pyspark.sql.types import StringType

# Define the function to transform the 'ecommerce_website_name'
def clean_website_name(website_name):
    if website_name.startswith('www.'):
        website_name = website_name[4:]
    if website_name.endswith('.com'):
        website_name = website_name[:-4]
    return website_name.capitalize()

# Register the function as a UDF
clean_website_name_udf = udf(clean_website_name, StringType())

# Apply the UDF to the DataFrame and overwrite the existing column
df = df.withColumn('ecommerce_website_name', clean_website_name_udf(col('ecommerce_website_name')))

# Show the results
df.show(truncate=False)

+------------------------+----------------+-----------------+----------+-----------------------------------------------------------------------------------------------------------------------------+-------------------+----------------+---+------+----------+----------------------------------------+----------------+----------------------+----------------+-------------------+
|order_id                |customer_id     |customer_name    |product_id|product_name                                                                                                                 |product_category   |payment_type    |qty|price |datetime  |country                                 |city            |ecommerce_website_name|payment_txn_id  |payment_txn_success|
+------------------------+----------------+-----------------+----------+-----------------------------------------------------------------------------------------------------------------------------+-------------------+----------------+---+------+--

In [188]:
# List of columns to drop
columns_to_drop = ["product_name", "product_category","order_id", "customer_id", "customer_name", "product_id", "payment_type", "payment_txn_id", "payment_txn_success"]

# Drop the columns
df_dropped = df.drop(*columns_to_drop)

# Show the DataFrame after dropping the columns
df_dropped.show()


+---+------+----------+--------------------+----------------+----------------------+
|qty| price|  datetime|             country|            city|ecommerce_website_name|
+---+------+----------+--------------------+----------------+----------------------+
|  3| 25.19|2022-05-19|       United States|       Blackhawk|               Wayfair|
|  4| 59.99|2023-06-15|       United States|   Lake Barcroft|               Walmart|
|  6| 17.99|2023-09-11|                Iraq|           Dahūk|                Costco|
|  1| 13.19|2022-09-25|South Georgia And...|       Grytviken|                Amazon|
|  9|  8.39|2024-06-15|                Mali|        Yélimané|                 Apple|
|  7| 35.94|2022-02-14|             Ukraine|           Odesa|               Wayfair|
|  5|119.99|2023-06-08|           Argentina|            Vera|                Amazon|
|  5| 114.0|2024-03-09|               China|          Guilin|                 Apple|
|  4| 59.99|2024-05-08|       United States|       Warrenton|    

In [189]:
save_dataframe_to_csv(df_dropped, "dropped_data.csv")

In [190]:
def count_unique_values(df):
    unique_counts = {}
    for column in df.columns:
        unique_count = df.select(countDistinct(column)).collect()[0][0]
        unique_counts[column] = unique_count
    return unique_counts

# Example usage
unique_counts = count_unique_values(df_dropped)
print(unique_counts)


{'qty': 12, 'price': 366, 'datetime': 945, 'country': 36, 'city': 99, 'ecommerce_website_name': 11}


In [191]:
df = df_dropped

In [192]:
# Extract year and quarter, calculate sales
quarterly_sales_df = df.withColumn("year", year(col("datetime"))) \
                       .withColumn("quarter", quarter(col("datetime"))) \
                       .withColumn("sales", col("qty") * col("price")) \
                       .groupBy("ecommerce_website_name", "year", "quarter") \
                       .agg(round(sum(col("sales")), 2).alias("total_sales")) \
                       .withColumn("year_quarter", concat(col("year"), lit(" Q"), col("quarter")))

# Pivot the DataFrame
pivot_df = quarterly_sales_df.groupBy("ecommerce_website_name") \
                             .pivot("year_quarter") \
                             .sum("total_sales")

# Format the numbers with comma separators and two decimal places
formatted_columns = [format_number(col(c), 2).alias(c) if c != 'ecommerce_website_name' else col(c) for c in pivot_df.columns]

# Select formatted columns
formatted_pivot_df = pivot_df.select(formatted_columns)

# Show the results
formatted_pivot_df.show(truncate=False)



+----------------------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+
|ecommerce_website_name|2022 Q1  |2022 Q2  |2022 Q3  |2022 Q4  |2023 Q1  |2023 Q2  |2023 Q3  |2023 Q4  |2024 Q1  |2024 Q2  |2024 Q3  |
+----------------------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+
|Nike                  |57,589.75|26,094.88|49,466.67|47,400.69|38,535.40|27,308.98|81,187.00|42,430.62|18,024.03|23,389.42|5,281.60 |
|Walmart               |19,646.97|26,336.09|32,397.51|30,020.28|25,008.51|67,240.57|40,993.97|28,361.97|29,345.32|42,140.80|14,581.66|
|Costco                |31,883.04|44,098.56|47,802.18|30,722.84|24,628.72|24,335.49|57,420.53|34,411.59|52,640.26|28,207.73|13,548.66|
|Etsy                  |28,868.99|59,384.34|40,969.01|43,247.15|31,951.87|23,911.51|49,504.27|28,222.88|27,038.40|37,309.45|6,951.83 |
|Wayfair               |34,179.40|28,737.72|26,985.26|2

In [193]:
save_dataframe_to_csv(formatted_pivot_df, "website_by_quarter_by_year.csv")

In [194]:
# Calculate total sales across all quarters per ecommerce_website_name
total_sales_df = quarterly_sales_df.groupBy("ecommerce_website_name") \
                                   .agg(round(sum(col("total_sales")), 2).alias("total_sales"))

total_sales_df.show(truncate=False)

+----------------------+-----------+
|ecommerce_website_name|total_sales|
+----------------------+-----------+
|Nike                  |416709.04  |
|Walmart               |356073.65  |
|Etsy                  |377359.7   |
|Costco                |389699.6   |
|Wayfair               |351141.93  |
|Bestbuy               |373951.8   |
|Homedepot             |375882.63  |
|Ebay                  |361431.47  |
|Alibaba               |379909.74  |
|Apple                 |335425.55  |
|Amazon                |435260.44  |
+----------------------+-----------+



In [195]:
# Extract year and quarter, calculate sales
quarterly_sales_df = df.withColumn("quarter", quarter(col("datetime"))) \
                       .withColumn("sales", col("qty") * col("price")) \
                       .groupBy("ecommerce_website_name", "quarter") \
                       .agg(round(sum(col("sales")), 2).alias("total_sales")) \
                       .withColumn("quarter", concat(lit(" Q"), col("quarter")))

total_revenue_by_site = quarterly_sales_df.groupBy("ecommerce_website_name") \
                                          .agg(round(sum(col("total_sales")), 2).alias("total_sales"))

# Pivot the DataFrame
pivot_df = quarterly_sales_df.groupBy("ecommerce_website_name") \
                             .pivot("quarter") \
                             .sum("total_sales")

pivot_df = pivot_df.join(total_revenue_by_site, "ecommerce_website_name")\
.orderBy(col("total_sales").desc())

# Format the numbers with comma separators and two decimal places
formatted_columns = [format_number(col(c), 2).alias(c) if c != 'ecommerce_website_name' else col(c) for c in pivot_df.columns]

# Select formatted columns
formatted_pivot_df = pivot_df.select(formatted_columns)

# Show the results
formatted_pivot_df.show(truncate=False)


+----------------------+----------+----------+----------+----------+-----------+
|ecommerce_website_name| Q1       | Q2       | Q3       | Q4       |total_sales|
+----------------------+----------+----------+----------+----------+-----------+
|Amazon                |87,312.15 |145,922.81|97,400.18 |104,625.30|435,260.44 |
|Nike                  |114,149.18|76,793.28 |135,935.27|89,831.31 |416,709.04 |
|Costco                |109,152.02|96,641.78 |118,771.37|65,134.43 |389,699.60 |
|Alibaba               |131,051.36|92,811.49 |76,131.10 |79,915.79 |379,909.74 |
|Etsy                  |87,859.26 |120,605.30|97,425.11 |71,470.03 |377,359.70 |
|Homedepot             |82,097.85 |132,442.06|103,119.32|58,223.40 |375,882.63 |
|Bestbuy               |105,677.86|106,839.80|84,702.79 |76,731.35 |373,951.80 |
|Ebay                  |106,885.89|123,483.04|64,703.95 |66,358.59 |361,431.47 |
|Walmart               |74,000.80 |135,717.46|87,973.14 |58,382.25 |356,073.65 |
|Wayfair               |110,

In [196]:
save_dataframe_to_csv(formatted_pivot_df, "website_by_quarter.csv")

In [197]:
# Calculate revenue as qty * price
df = df.withColumn("revenue", col("qty") * col("price"))

# Group by country and ecommerce_website_name, then sum the revenue
grouped_df = df.groupBy("country", "ecommerce_website_name") \
               .agg(round(sum("revenue"), 2).alias("total_revenue"))

# Pivot the DataFrame
pivot_df = grouped_df.groupBy("country") \
                     .pivot("ecommerce_website_name") \
                     .sum("total_revenue")

# Calculate the total revenue per country for ordering
total_revenue_per_country = grouped_df.groupBy("country") \
                                      .agg(sum("total_revenue").alias("country_total_revenue"))

# Join the pivoted DataFrame with the total revenue DataFrame
pivot_df = pivot_df.join(total_revenue_per_country, on="country") \
                   .orderBy(col("country_total_revenue").desc())

# Drop the temporary total revenue column used for ordering
pivot_df = pivot_df.drop("country_total_revenue")

# Show the results
pivot_df.show(truncate=False)


+----------------------------------------+---------+--------+---------+---------+---------+---------+---------+---------+---------+---------+---------+
|country                                 |Alibaba  |Amazon  |Apple    |Bestbuy  |Costco   |Ebay     |Etsy     |Homedepot|Nike     |Walmart  |Wayfair  |
+----------------------------------------+---------+--------+---------+---------+---------+---------+---------+---------+---------+---------+---------+
|United States                           |154898.15|243406.8|166481.07|194378.43|198490.16|172194.69|199031.58|201572.17|205269.08|138144.46|166859.34|
|Russia                                  |30229.67 |18960.3 |15727.06 |32633.11 |36626.24 |17400.29 |35232.08 |14988.66 |40076.29 |29393.15 |33311.41 |
|China                                   |24424.87 |17023.82|17643.12 |7898.72  |16097.21 |7854.66  |12249.35 |9205.9   |16272.81 |17099.82 |21055.99 |
|Iran                                    |24721.23 |11813.27|6607.43  |6339.7   |6925.77

In [198]:
save_dataframe_to_csv(pivot_df, "country_website.csv")

In [199]:
# Extract year and quarter, calculate sales
quarterly_sales_df = df.withColumn("quarter", quarter(col("datetime"))) \
                       .withColumn("sales", col("qty") * col("price")) \
                       .groupBy("country", "quarter") \
                       .agg(round(sum(col("sales")), 2).alias("total_sales")) \
                       .withColumn("quarter", concat(lit(" Q"), col("quarter")))

# Pivot the DataFrame
pivot_df = quarterly_sales_df.groupBy("country") \
                             .pivot("quarter") \
                             .sum("total_sales")
                             
# Join the pivoted DataFrame with the total revenue DataFrame
pivot_df = pivot_df.join(total_revenue_per_country, on="country") \
                   .orderBy(col("country_total_revenue").desc())

# Format the numbers with comma separators and two decimal places
formatted_columns = [format_number(col(c), 2).alias(c) if c != 'country' else col(c) for c in pivot_df.columns]

# Select formatted columns
formatted_pivot_df = pivot_df.select(formatted_columns)

# Show the results
formatted_pivot_df.show(truncate=False)

+----------------------------------------+----------+----------+----------+----------+---------------------+
|country                                 | Q1       | Q2       | Q3       | Q4       |country_total_revenue|
+----------------------------------------+----------+----------+----------+----------+---------------------+
|United States                           |522,918.86|576,777.97|505,373.07|435,656.03|2,040,725.93         |
|Russia                                  |87,332.85 |109,532.20|49,724.33 |57,988.88 |304,578.26           |
|China                                   |67,819.16 |42,749.93 |31,676.60 |24,580.58 |166,826.27           |
|Iran                                    |32,664.39 |50,275.65 |9,662.61  |17,833.01 |110,435.66           |
|Japan                                   |22,318.61 |21,069.70 |27,911.08 |17,574.33 |88,873.72            |
|Ukraine                                 |20,490.51 |28,162.34 |14,866.49 |16,244.67 |79,764.01            |
|Mexico            

In [200]:
save_dataframe_to_csv(formatted_pivot_df, "country_by_quarter.csv")

In [201]:
# Extract year and quarter, calculate sales
quarterly_sales_df = df.withColumn("year", year(col("datetime"))) \
                       .withColumn("sales", col("qty") * col("price")) \
                       .groupBy("country", "year") \
                       .agg(round(sum(col("sales")), 2).alias("total_sales"))                        

# Pivot the DataFrame
pivot_df = quarterly_sales_df.groupBy("country") \
                             .pivot("year") \
                             .sum("total_sales")
                             
# Join the pivoted DataFrame with the total revenue DataFrame
pivot_df = pivot_df.join(total_revenue_per_country, on="country") \
                   .orderBy(col("country_total_revenue").desc())

# Format the numbers with comma separators and two decimal places
formatted_columns = [format_number(col(c), 2).alias(c) if c != 'country' else col(c) for c in pivot_df.columns]

# Select formatted columns
formatted_pivot_df = pivot_df.select(formatted_columns)

# Show the results
formatted_pivot_df.show(truncate=False)

+----------------------------------------+----------+----------+----------+---------------------+
|country                                 |2022      |2023      |2024      |country_total_revenue|
+----------------------------------------+----------+----------+----------+---------------------+
|United States                           |726,915.08|852,850.05|460,960.80|2,040,725.93         |
|Russia                                  |138,281.83|117,301.74|48,994.69 |304,578.26           |
|China                                   |65,243.68 |63,639.97 |37,942.62 |166,826.27           |
|Iran                                    |48,687.30 |26,738.34 |35,010.02 |110,435.66           |
|Japan                                   |46,614.42 |29,969.98 |12,289.32 |88,873.72            |
|Ukraine                                 |28,604.17 |42,445.68 |8,714.16  |79,764.01            |
|Mexico                                  |23,811.64 |35,499.77 |20,339.58 |79,650.99            |
|Kenya              

In [202]:
from pyspark.sql.functions import col, quarter, year, sum, round, lit, concat, format_number

# Extract year and quarter, calculate sales
quarterly_sales_df = df.withColumn("year", year(col("datetime"))) \
                       .withColumn("quarter", quarter(col("datetime"))) \
                       .withColumn("sales", col("qty") * col("price")) \
                       .groupBy("country", "year", "quarter") \
                       .agg(round(sum(col("sales")), 2).alias("total_sales")) \
                       .withColumn("year_quarter", concat(col("year"), lit(" Q"), col("quarter")))

# Calculate total sales across all quarters and years per country
total_sales_df = quarterly_sales_df.groupBy("country") \
                                   .agg(round(sum(col("total_sales")), 2).alias("country_total_revenue"))

# Join the total sales DataFrame with the quarterly sales DataFrame
quarterly_sales_with_total_df = quarterly_sales_df.join(total_sales_df, on="country")

# Pivot the DataFrame to show quarters by year
pivot_df = quarterly_sales_with_total_df.groupBy("country") \
                                        .pivot("year_quarter") \
                                        .sum("total_sales")

# Add total sales column to the pivoted DataFrame
pivot_df = pivot_df.join(total_sales_df, on="country") \
                   .orderBy(col("country_total_revenue").desc())

# Format the numbers with comma separators and two decimal places
formatted_columns = [format_number(col(c), 2).alias(c) if c != 'country' else col(c) for c in pivot_df.columns]

# Select formatted columns
formatted_pivot_df = pivot_df.select(formatted_columns)

# Show the results
formatted_pivot_df.show(truncate=False)


+----------------------------------------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+---------+---------------------+
|country                                 |2022 Q1   |2022 Q2   |2022 Q3   |2022 Q4   |2023 Q1   |2023 Q2   |2023 Q3   |2023 Q4   |2024 Q1   |2024 Q2   |2024 Q3  |country_total_revenue|
+----------------------------------------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+---------+---------------------+
|United States                           |142,602.60|194,614.77|166,370.03|223,327.68|184,481.98|193,579.26|262,460.46|212,328.35|195,834.28|188,583.94|76,542.58|2,040,725.93         |
|Russia                                  |36,935.89 |44,649.44 |17,704.15 |38,992.35 |23,470.39 |46,986.49 |27,848.33 |18,996.53 |26,926.57 |17,896.27 |4,171.85 |304,578.26           |
|China                                   |24,944.98 |12,299.11 |13,495.18 |

In [203]:
save_dataframe_to_csv(formatted_pivot_df, "country_by_year_and_quarter.csv")

24/08/07 15:52:50 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 974371 ms exceeds timeout 120000 ms
24/08/07 15:52:50 WARN SparkContext: Killing executors is not supported by current scheduler.
24/08/07 15:52:55 ERROR Inbox: Ignoring error
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:56)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:310)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:102)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:110)
	at org.apache.spark.util.RpcUtils$.makeDriverRef(RpcUtils.scala:36)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.driverEndpoint$lzycompute(BlockManagerMasterEndpoint.scala:124)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.org$apache$spark$storage$BlockManagerMasterEndpoint$$