<a href="https://colab.research.google.com/github/JasminiSantos/TDE3-Apache-Spark-SQL/blob/main/TDE3_Apache_Spark_SQL.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.4.0.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m4.2 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.4.0-py2.py3-none-any.whl size=311317130 sha256=b071a2fe4dc17c43c029c1f0cd45bf46112c29f249b667ee4c6e28103241cbb0
  Stored in directory: /root/.cache/pip/wheels/7b/1b/4b/3363a1d04368e7ff0d408e57ff57966fcdf00583774e761327
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.4.0


In [None]:
!wget https://jpbarddal.github.io/assets/data/bigdata/transactions_amostra.csv.zip
!unzip transactions_amostra.csv.zip

--2023-05-20 00:14:55--  https://jpbarddal.github.io/assets/data/bigdata/transactions_amostra.csv.zip
Resolving jpbarddal.github.io (jpbarddal.github.io)... 185.199.108.153, 185.199.109.153, 185.199.110.153, ...
Connecting to jpbarddal.github.io (jpbarddal.github.io)|185.199.108.153|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 47513871 (45M) [application/zip]
Saving to: ‘transactions_amostra.csv.zip’


2023-05-20 00:14:55 (198 MB/s) - ‘transactions_amostra.csv.zip’ saved [47513871/47513871]

Archive:  transactions_amostra.csv.zip
  inflating: transactions_amostra.csv  
  inflating: __MACOSX/._transactions_amostra.csv  


In [None]:
from pyspark.sql import SparkSession

# Create SparkSession
spark = SparkSession.builder\
.master('local[*]')\
.appName('tde3').getOrCreate()

# Read the dataset into a DataFrame
df = spark.read.csv("transactions_amostra.csv", header=True, inferSchema=True, sep=";")

# Register the DataFrame as a temporary view
df.createOrReplaceTempView("transactions")

In [None]:
columns = df.take(1)[0].split(";")
print("Columns of the dataset:")
for column in columns:
    print(column)

In [None]:
# Problem 1: Number of transactions involving Brazil

# Use Spark SQL to query the number of transactions involving Brazil
transactions_in_brazil = spark.sql("SELECT COUNT(*) AS count FROM transactions WHERE country_or_area = 'Brazil'")

# Retrieve the count value
count = transactions_in_brazil.first()["count"]

print("Number of transactions involving Brazil:", count)

Number of transactions involving Brazil: 27463


In [None]:
# Problem 2: Number of transactions per flow type and year

# Calculate the number of transactions per flow and year
transactions_per_flow_and_year = spark.sql("SELECT Flow, Year, COUNT(*) AS count FROM transactions GROUP BY Flow, Year")

# Show the results
transactions_per_flow_and_year.show()

+---------+----+-----+
|     Flow|Year|count|
+---------+----+-----+
|Re-Export|2006| 2431|
|   Import|2009|31001|
|   Import|2005|31124|
|   Export|1995|11547|
|Re-Export|1999| 2046|
|Re-Import|2000|  952|
|   Export|2000|16855|
|   Export|2001|16759|
|   Import|2015|28834|
|Re-Export|1993|  973|
|Re-Export|2013| 2225|
|Re-Export|1998| 1519|
|Re-Export|1997| 1605|
|Re-Export|2016| 2298|
|   Import|1998|24881|
|   Export|1993| 7766|
|Re-Export|1992|  764|
|   Import|1991| 6185|
|   Import|2011|31301|
|   Import|1990| 4866|
+---------+----+-----+
only showing top 20 rows



In [None]:
# Problem 3: Average commodity values per year

# Calculate average commodity values per year
average_commodity_values = spark.sql("SELECT year, AVG(trade_usd) AS average_value FROM transactions GROUP BY year")

# Show the results
average_commodity_values.show()

+----+--------------------+
|year|       average_value|
+----+--------------------+
|1990| 1.172426586778952E7|
|2003|1.3028917611334749E7|
|2007|2.3710673174875777E7|
|2015|   3.1115574884196E7|
|2006|2.1175872541099638E7|
|2013| 3.306115128882995E7|
|1997|   9549881.214776853|
|1988| 1.864297055638571E7|
|1994|1.1350325049077941E7|
|2014| 4.612040441345007E7|
|2004|1.5388487793083541E7|
|1991| 1.306922385515173E7|
|1996|1.1945524161286663E7|
|1989|1.1263871329920229E7|
|1998|1.0175610459598826E7|
|2012|3.9028921881444596E7|
|2009|2.5068409504465386E7|
|2016| 2.941832757526777E7|
|1995|1.2286454103356835E7|
|2001|   9942220.288239626|
+----+--------------------+
only showing top 20 rows



In [None]:
# Problem 4: Average price of commodities per unit type, year, and category in the export flow in Brazil

# Calculate average price per unit type, year, and category in export flow in Brazil
average_price_per_unit_type_year_category = spark.sql("""
    SELECT Year, category, AVG(trade_usd) AS average_price
    FROM transactions
    WHERE Flow = 'Export' AND country_or_area = 'Brazil'
    GROUP BY  Year, Category
""")

# Show the results
average_price_per_unit_type_year_category.show()

+----+--------------------+-------------------+
|Year|            category|      average_price|
+----+--------------------+-------------------+
|2007|30_pharmaceutical...|            95057.0|
|1993|27_mineral_fuels_...|          5313712.5|
|2016|37_photographic_o...| 2319.6666666666665|
|2000|     01_live_animals|          1952949.0|
|1992|15_animal_vegetab...|          2108165.4|
|2008|38_miscellaneous_...|       4.49661584E7|
|2014|68_stone_plaster_...|          1.84138E7|
|2006|27_mineral_fuels_...| 3407871.6666666665|
|1993|28_inorganic_chem...|         4578559.24|
|2008|28_inorganic_chem...|         4449268.25|
|1994|39_plastics_and_a...| 3172772.5454545454|
|2005|02_meat_and_edibl...|       2.09560584E8|
|2007|48_paper_paperboa...|5.556047253333333E7|
|1991|93_arms_and_ammun...|            69896.0|
|1999|80_tin_and_articl...|        1.5815404E7|
|1994|17_sugars_and_sug...|8.347317433333333E7|
|1997|60_knitted_or_cro...|         3641387.75|
|1999|22_beverages_spir...|          203

In [None]:
# Problem 5: Maximum, minimum, and mean transaction price per unit type and year
transaction_stats_per_unit_type_and_year = df.groupBy("Unit", "Year") \
    .agg({"Price": "max", "Price": "min", "Price": "avg"})
transaction_stats_per_unit_type_and_year.show()

+------+--------------------+----------------+
|  Flow|           Commodity|  total_quantity|
+------+--------------------+----------------+
|Export|Iron ore, concent...|3.79546246752E11|
+------+--------------------+----------------+



In [None]:
# Problem 6: Country with the largest average commodity price in the Export flow

# Execute the SQL query
largest_avg_price_country = spark.sql("""
    SELECT country_or_area, AVG(trade_usd) AS avg_price
    FROM transactions
    WHERE Flow = 'Export'
    GROUP BY country_or_area
    ORDER BY avg_price DESC
    LIMIT 1
""")

largest_avg_price_country.show()

+---------------+--------------------+
|country_or_area|           avg_price|
+---------------+--------------------+
|         Angola|1.636966606814285...|
+---------------+--------------------+



In [None]:
# Problem 7: Most commercialized commodity in 2016, per flow type

# Execute the SQL query
most_commercialized_commodity_2016 = spark.sql("""
    SELECT Flow, Commodity, SUM(quantity) AS total_amount
    FROM transactions
    WHERE Year = '2016'
    GROUP BY Flow, Commodity
    ORDER BY total_amount DESC
    LIMIT 1
""")
most_commercialized_commodity_2016.show()

+------+--------------------+----------------+
|  Flow|           Commodity|    total_amount|
+------+--------------------+----------------+
|Export|Iron ore, concent...|3.79546246752E11|
+------+--------------------+----------------+



In [None]:
# Stop the SparkSession
spark.stop()