In [18]:
import os
import pandas as pd
import numpy as np
from pyspark.sql import SparkSession, SQLContext
from pyspark.sql.functions import col, udf
from pyspark.sql.types import DecimalType
import requests

In [None]:
# Create a Spark session
spark = SparkSession.builder.appName("CurrencyConversion").getOrCreate()

Convert the sales data from sales_and_traffic_data.csv to euro by
using amazon_shop_mapping.csv and the currency conversion api presented or
any forex api from your choice

In [None]:
# Read the sales data and shop mapping data
sales_data = spark.read.csv("C:\Users\AmmadAnwar\sales_and_traffic_data.csv", header=True)
shop_mapping_data = spark.read.csv("C:\Users\AmmadAnwar\amazon_shop_mapping.csv", header=True)

In [None]:
# Join the sales data with shop mapping data to get the currency for each shop
joined_data = sales_data.join(shop_mapping_data, on=["shop_name"], how="left")

# Define a UDF to perform currency conversion using an API
@udf(DecimalType(10, 2))
def convert_to_euro(amount, currency):
    # Replace with the URL of a currency conversion API
    conversion_api_url = "https://currencylayer.com/".format(currency, amount)
    response = requests.get(conversion_api_url)
    if response.status_code == 200:
        converted_amount = response.json()["result"]
        return converted_amount
    else:
        return None

In [None]:
# Apply the UDF to convert sales columns to euro
for column in ["ordered_products_sale", "ordered_products_sales_b2b"]:
    joined_data = joined_data.withColumn(column + "_euro", convert_to_euro(col(column), col("currency")))

# Select relevant columns and drop unnecessary ones
result_data = joined_data.select(
    "child_asin", "sessions", "page_views", "units_ordered", "units_ordered_b2b",
    "ordered_products_sale_euro", "ordered_products_sales_b2b_euro",
    "total_ordered_items", "total_ordered_items_b2b", "region", "shop_name", "report_date"
)

# Show the resulting DataFrame
result_data.show()

# Stop the Spark session
spark.stop()

Calculate the total revenue.

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# Create a Spark session
spark = SparkSession.builder.appName("TotalRevenueCalculation").getOrCreate()

# Read the converted sales data
sales_data = spark.read.csv("converted_sales_data.csv", header=True)

# Calculate the total revenue
total_revenue = sales_data.select(
    (col("ordered_products_sale_euro") + col("ordered_products_sales_b2b_euro")).alias("total_revenue")
).agg({"total_revenue": "sum"}).collect()[0][0]

# Show the total revenue
print("Total Revenue in Euro:", total_revenue)

# Stop the Spark session
spark.stop()

In separate Dataframes present the total revenue per country and per shop and per
month.

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, year, month, sum

# Create a Spark session
spark = SparkSession.builder.appName("TotalRevenuePerCountryShopMonth").getOrCreate()

# Read the converted sales data
sales_data = spark.read.csv("converted_sales_data.csv", header=True)

# Extract country, shop, and month from report_date
sales_data = sales_data.withColumn("country", col("region"))
sales_data = sales_data.withColumn("shop", col("shop_name"))
sales_data = sales_data.withColumn("month", month(col("report_date")))
sales_data = sales_data.withColumn("year", year(col("report_date")))

# Group by country, shop, and month
grouped_data = sales_data.groupBy("country", "shop", "year", "month")

# Calculate the total revenue per country, shop, and month
total_revenue_per_month = grouped_data.agg(sum("ordered_products_sale_euro").alias("total_revenue"))

# Show the result
total_revenue_per_month.show()

# Stop the Spark session
spark.stop()


Replicate the above questions in SQL.

In [None]:

-- Create a new table to store the converted sales data
CREATE TABLE converted_sales_data (
    child_asin VARCHAR(255),
    sessions INT,
    page_views INT,
    units_ordered INT,
    units_ordered_b2b INT,
    ordered_products_sale DECIMAL(10, 2),
    ordered_products_sales_b2b DECIMAL(10, 2),
    total_ordered_items INT,
    total_ordered_items_b2b INT,
    region VARCHAR(255),
    shop_name VARCHAR(255),
    report_date DATE,
    currency VARCHAR(255)
);

-- Insert converted sales data by joining with amazon_shop_mapping
INSERT INTO converted_sales_data
SELECT
    sd.child_asin,
    sd.sessions,
    sd.page_views,
    sd.units_ordered,
    sd.units_ordered_b2b,
    sd.ordered_products_sale,
    sd.ordered_products_sales_b2b,
    sd.total_ordered_items,
    sd.total_ordered_items_b2b,
    sd.region,
    sd.shop_name,
    sd.report_date,
    asm.currency
FROM
    sales_data sd
JOIN
    amazon_shop_mapping asm
ON
    sd.shop_name = asm.shop_name;

Transform the campaign_object.csv by separating the column CREATIVE to
multiple columns ( brandName , brandLogoAssetID , headline , asins , brandLogoUrl ).

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json
from pyspark.sql.types import StructType, StructField, StringType, ArrayType

In [None]:
# Create a Spark session
spark = SparkSession.builder.appName("CampaignObjectTransform").getOrCreate()

# Read the campaign_object.csv file into a DataFrame
campaign_df = spark.read.csv("path/to/campaign_object.csv", header=True, inferSchema=True)

# Define the schema for the CREATIVE column
creative_schema = StructType([
    StructField("brandName", StringType(), True),
    StructField("brandLogoAssetID", StringType(), True),
    StructField("headline", StringType(), True),
    StructField("asins", ArrayType(StringType()), True),
    StructField("brandLogoUrl", StringType(), True)
])

# Parse the CREATIVE column into a JSON struct
campaign_df = campaign_df.withColumn("CREATIVE_JSON", from_json(col("CREATIVE"), creative_schema))

# Extract individual columns from the JSON struct
campaign_df = campaign_df.withColumn("brandName", col("CREATIVE_JSON.brandName"))
campaign_df = campaign_df.withColumn("brandLogoAssetID", col("CREATIVE_JSON.brandLogoAssetID"))
campaign_df = campaign_df.withColumn("headline", col("CREATIVE_JSON.headline"))
campaign_df = campaign_df.withColumn("asins", col("CREATIVE_JSON.asins"))
campaign_df = campaign_df.withColumn("brandLogoUrl", col("CREATIVE_JSON.brandLogoUrl"))

# Drop the original CREATIVE and CREATIVE_JSON columns
campaign_df = campaign_df.drop("CREATIVE", "CREATIVE_JSON")

# Show the transformed DataFrame
campaign_df.show()

Separate the new formatted column asins to 3 new
columns asin_1 , asin_2 , asin_2

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# Create a Spark session
spark = SparkSession.builder.appName("SeparateAsins").getOrCreate()

# Read the DataFrame (assuming you've already transformed the campaign_df as described in the previous answer)
# campaign_df = ...

# Create three new columns asin_1, asin_2, and asin_3 by extracting values from the asins array
campaign_df = campaign_df.withColumn("asin_1", col("asins").getItem(0))
campaign_df = campaign_df.withColumn("asin_2", col("asins").getItem(1))
campaign_df = campaign_df.withColumn("asin_3", col("asins").getItem(2))

# Show the updated DataFrame
campaign_df.show()


By using sales_and_traffic_data.csv, extract the distinct asin list and save it into a
dataframe.

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# Create a Spark session
spark = SparkSession.builder.appName("ExtractDistinctAsins").getOrCreate()

# Read the sales_and_traffic_data.csv file into a DataFrame
sales_data_df = spark.read.csv("path/to/sales_and_traffic_data.csv", header=True)

# Select the distinct 'child_asin' values and store them in a new DataFrame
distinct_asins_df = sales_data_df.select(col("child_asin").alias("distinct_asin")).distinct()

# Show the distinct asin values
distinct_asins_df.show()

# Stop the Spark session
spark.stop()


Create a new column in the new dataframe
from campaign_object.csv, called active_asin , containing the first asin that exists
in the formatted distinct asin list.
- Example: if asin_1 exist in the distinct asin list
  then active_asin = asin_1 otherwise a comparison with asin_2 is needed and
  if asin_2 does not exist in the distinct asin list, we move to asin_3 comparison.

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when

# Create a Spark session
spark = SparkSession.builder.appName("CreateActiveAsin").getOrCreate()

# Read the campaign_object.csv file into a DataFrame
campaign_data_df = spark.read.csv("path/to/campaign_object.csv", header=True)

# Extract the distinct asin values and store them in a list
distinct_asin_list = [row.distinct_asin for row in distinct_asins_df.collect()]

# Create a function to find the active asin based on the distinct asin list
def find_active_asin(asin_1, asin_2, asin_3):
    if asin_1 in distinct_asin_list:
        return asin_1
    elif asin_2 in distinct_asin_list:
        return asin_2
    elif asin_3 in distinct_asin_list:
        return asin_3
    else:
        return None

# Define a UDF (User-Defined Function) to apply the find_active_asin function
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

find_active_asin_udf = udf(find_active_asin, StringType())

# Add the 'active_asin' column to the campaign_data_df
campaign_data_df = campaign_data_df.withColumn(
    "active_asin",
    find_active_asin_udf(
        col("asin_1"),
        col("asin_2"),
        col("asin_3")
    )
)

# Show the DataFrame with the 'active_asin' column
campaign_data_df.show()

# Stop the Spark session
spark.stop()
