In [1]:
# Import the required packages
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.functions import when
from pyspark.sql.functions import sum


In [2]:
# Initialize the Spark Session
spark=SparkSession.builder.appName("parquetFile").getOrCreate()

In [3]:
# Using PySpark create the Dataframes for the parquet files

aDF=spark.read.parquet("C:/Users/User/Downloads/data/resources/a.parquet")
bDF=spark.read.parquet("C:/Users/User/Downloads/data/resources/b.parquet")
cDF=spark.read.parquet("C:/Users/User/Downloads/data/resources/c.parquet")
dDF=spark.read.parquet("C:/Users/User/Downloads/data/resources/d.parquet")
eDF=spark.read.parquet("C:/Users/User/Downloads/data/resources/e.parquet")
fDF=spark.read.parquet("C:/Users/User/Downloads/data/resources/f.parquet")
gDF=spark.read.parquet("C:/Users/User/Downloads/data/resources/g.parquet")

In [10]:
gDF.printSchema()

root
 |-- Amount: double (nullable = true)
 |-- Value: string (nullable = true)
 |-- ProductNumber: string (nullable = true)
 |-- CategoryId: long (nullable = true)
 |-- Type: long (nullable = true)
 |-- IncotermsCity: string (nullable = true)
 |-- PayDate: timestamp_ntz (nullable = true)
 |-- TransactionId: long (nullable = true)



In [10]:
# print("The following is the DataFrame for the a.parquet file")
# aDF.show()
# print("The following is the DataFrame for the b.parquet file")
# bDF.show()
# print("The following is the DataFrame for the c.parquet file")
# cDF.show()
# print("The following is the DataFrame for the d.parquet file")
# dDF.show()
print("The following is the DataFrame for the e.parquet file")
eDF.show()
# print("The following is the DataFrame for the f.parquet file")
# fDF.show()
# print("The following is the DataFrame for the g.parquet file")
# gDF.show()

The following is the DataFrame for the e.parquet file
+-----------+----+
|CustomerId |Name|
+-----------+----+
|          1|   A|
|          2|   B|
|          3|   C|
|          4|   D|
|          5|   F|
|          6|   G|
|          7|   H|
|          8|   I|
|          9|   J|
|         10|   K|
|         11|   L|
|         12|   M|
|         13|   N|
|         14|   O|
|         15|   P|
|         16|   Q|
|         17|   R|
|         18|   T|
|         19|   S|
|         20|   V|
+-----------+----+
only showing top 20 rows



In [14]:
# Rename the "Name" column of the dDF to "ProgramName" (assumption)
# Rename the "Name" column of the cDF to "CategoryName" (assumption)

dDF = dDF.withColumnRenamed("Name","ProgramName")
cDF = cDF.withColumnRenamed("Name","CategoryName")
# dDF.show()

What is the revenue (in euros) created and received in 2021?

In [None]:
# Using the INNER JOIN function merge the DataFrames a and g on the "TransactionID" value

merged_df = aDF.join(gDF, on="TransactionId", how="inner")
merged_df.show()

In [None]:
# As we require the revenue that was created and received in 2021, the column "PayDate" is filtered to keep only the transactions that were paid in 2021

# Filter dataframe to keep only entries from the year 2021
filtered_df = merged_df.filter(col("PayDate").contains("2021"))

# Show the filtered dataframe
filtered_df.show()

In [9]:
# First assumption, convert the transactions that are in USD to EUR based on the average transaction rate of 2021. Source: https://www.exchangerates.org.uk/USD-EUR-spot-exchange-rates-history-2021.html

# Convert USD amounts to EUR using the conversion rate 0.8458
converted_df = filtered_df.withColumn("Amount", when(filtered_df["Value"] == "USD", filtered_df["Amount"] * 0.8458).otherwise(filtered_df["Amount"]))

# Change all instances of "USD" in the "Value" column to "EUR"
converted_df = converted_df.withColumn("Value", when(filtered_df["Value"] == "USD", "EUR").otherwise(filtered_df["Value"]))

# Show the converted dataframe
converted_df.show()

# Sum the values in the "Amount" column
total_revenue = converted_df.agg(sum("Amount")).collect()[0][0]

print(f"The revenue (in euros) created and received in 2021 is {total_revenue}")

+-------------+-------------------+------+----------+-----------------+--------+-----+--------------+----------+----+-------------+-------------------+
|TransactionId|       CreationDate|UserId|CustomerId|SalesDocumentItem|  Amount|Value| ProductNumber|CategoryId|Type|IncotermsCity|            PayDate|
+-------------+-------------------+------+----------+-----------------+--------+-----+--------------+----------+----+-------------+-------------------+
|     10000145|2021-10-10 00:00:00|  6064|       309|          1000842|   807.0| EUR |6001-1330-9400|        13|   1|          SON|2021-07-12 00:00:00|
|     10000146|2021-10-05 00:00:00|  7423|       950|          1000388|352.6986|  EUR|6829-1900-0100|         2|   1|       BOSTON|2021-10-23 00:00:00|
|     10000147|2021-03-18 00:00:00|  8564|       418|          1000229|   939.0| EUR |6159-1700-2100|        19|   1|          SON|2021-09-15 00:00:00|
|     10000148|2021-08-09 00:00:00|  6466|       732|          1000026|   203.0| EUR |60

In [None]:
# Second assumption where we filter the table to only keep the EUR values

filtered_df_EUR = filtered_df.filter(col("Value").contains("EUR"))

# filtered_df_EUR.show()

# Sum the values in the "Amount" column
total_revenue = filtered_df_EUR.agg(sum("Amount")).collect()[0][0]

print(f"The revenue (in euros) created and received in 2021 is {total_revenue} ")

Rank the programs on biggest contribution to the received revenue of 2021

In [15]:
# First start by merging the two dataframes "cDF" and "dDF" based on the "ProgramID" column

merged_c_d = cDF.join(dDF, on="ProgramId", how="inner")
merged_c_d.show()

+---------+----------+------------+-----------+
|ProgramId|CategoryId|CategoryName|ProgramName|
+---------+----------+------------+-----------+
|        4|         1|         DJS|          D|
|        4|         2|         CHS|          D|
|        4|         3|         DIS|          D|
|        1|         4|         IOP|          K|
|        2|         5|          JL|          L|
|        3|         6|         JDS|          S|
|        1|         7|         ASD|          K|
|        1|         8|        NULL|          K|
|        2|         9|         WER|          L|
|        4|        10|         RET|          D|
|        1|        11|         VBC|          K|
|        4|        12|         SDL|          D|
|        2|        13|         SDK|          L|
|        3|        14|        SDKL|          S|
|        2|        15|         CNX|          L|
|        4|        16|         XNS|          D|
|        4|        17|        NULL|          D|
|        3|        18|        NULL|     

In [18]:
# Then merge the combined cDF/dDF with the combined (filtered and converted) aDF/gDF, based on the "CategoryID" (for this case the merged aDF/gDF, as shown in the first assumption is used)

merged_cd_ag = merged_c_d.join(converted_df, on="CategoryID", how="inner")
merged_cd_ag.show()

+----------+---------+------------+-----------+-------------+-------------------+------+----------+-----------------+--------+-----+--------------+----+-------------+-------------------+
|CategoryId|ProgramId|CategoryName|ProgramName|TransactionId|       CreationDate|UserId|CustomerId|SalesDocumentItem|  Amount|Value| ProductNumber|Type|IncotermsCity|            PayDate|
+----------+---------+------------+-----------+-------------+-------------------+------+----------+-----------------+--------+-----+--------------+----+-------------+-------------------+
|        13|        2|         SDK|          L|     10000145|2021-10-10 00:00:00|  6064|       309|          1000842|   807.0| EUR |6001-1330-9400|   1|          SON|2021-07-12 00:00:00|
|         2|        4|         CHS|          D|     10000146|2021-10-05 00:00:00|  7423|       950|          1000388|352.6986|  EUR|6829-1900-0100|   1|       BOSTON|2021-10-23 00:00:00|
|        19|        4|         QWE|          D|     10000147|2021

In [20]:
# Now rank the Programs based on their contribution


# Calculate sum of the amounts per ProgramId
sum_df = merged_cd_ag.groupBy("ProgramId").agg(sum("Amount").alias("TotalAmount"))

# Rank the ProgramIds based on the sum of amounts
ranked_df = sum_df.orderBy(col("TotalAmount").desc())

# Show the resulted rank
ranked_df.show()

+---------+-----------------+
|ProgramId|      TotalAmount|
+---------+-----------------+
|        4|8495589.937799998|
|        1|       23551.7634|
|        2|       17950.1846|
|        3|       14776.9546|
|        5|        6424.3366|
+---------+-----------------+



In [None]:
spark.stop()