<a href="https://colab.research.google.com/github/JasminiSantos/TDE2-Spark-RDDs/blob/main/TDE2_Spark_RDDs.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [None]:
!wget https://jpbarddal.github.io/assets/data/bigdata/transactions_amostra.csv.zip
!unzip transactions_amostra.csv.zip

--2023-05-19 22:27:53--  https://jpbarddal.github.io/assets/data/bigdata/transactions_amostra.csv.zip
Resolving jpbarddal.github.io (jpbarddal.github.io)... 185.199.108.153, 185.199.109.153, 185.199.110.153, ...
Connecting to jpbarddal.github.io (jpbarddal.github.io)|185.199.108.153|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 47513871 (45M) [application/zip]
Saving to: ‘transactions_amostra.csv.zip.1’


2023-05-19 22:27:53 (292 MB/s) - ‘transactions_amostra.csv.zip.1’ saved [47513871/47513871]

Archive:  transactions_amostra.csv.zip
replace transactions_amostra.csv? [y]es, [n]o, [A]ll, [N]one, [r]ename: 

In [None]:
from pyspark.sql import SparkSession

# Initialize Spark
spark = SparkSession.builder\
.master('local[*]')\
.appName('tde2').getOrCreate()
sc = spark.sparkContext

# Load the dataset from CSV file
dataset = sc.textFile("transactions_amostra.csv")

In [None]:
# Printing column names
columns = dataset.take(1)[0].split(";")
print("Columns of the dataset:")
for column in columns:
    print(column)

In [None]:
# Problem 1: The number of transactions involving Brazil

# Filter transactions involving Brazil
brazil_transactions = dataset.filter(lambda line: "Brazil" in line)

# Convert to PairRDD with a constant key
brazil_transactions_with_key = brazil_transactions.map(lambda line: ("brazil", 1))

# Calculate the total number of transactions involving Brazil
total_brazil_transactions = brazil_transactions_with_key.reduceByKey(lambda a, b: a + b)

# Retrieve the count
count = total_brazil_transactions.collect()[0][1]

print("Total transactions involving Brazil:", count)

In [None]:
# Problem 2: The number of transactions per flow type and year
transactions_per_flow_year = dataset.map(lambda line: ((line.split(";")[4], line.split(";")[1]), 1)) \
    .reduceByKey(lambda a, b: a + b)
for flow_year, count in transactions_per_flow_year.collect():
    print("Flow type and year:", flow_year, count)

In [None]:
 #Problem 3: The average of commodity values per year

# Process the dataset
commodity_values = dataset.map(lambda line: (line.split(";")[1], float(line.split(";")[5])))
commodity_values_per_year = commodity_values.aggregateByKey((0.0, 0), lambda a, b: (a[0] + b, a[1] + 1),
                                                           lambda a, b: (a[0] + b[0], a[1] + b[1]))
average_values_per_year = commodity_values_per_year.mapValues(lambda v: v[0] / v[1])

for year, average_value in average_values_per_year.collect():
    print("Average commodity value for year", year, ":")

In [None]:
# Problem 4: The average price of commodities per unit type, year, and category in the export flow in Brazil

# Process the dataset
export_flow_brazil = dataset.filter(lambda line: "Brazil" in line and line.split(";")[4] == "Export")
commodity_price_per_unit_type = export_flow_brazil.map(lambda line: ((line.split(";")[7], line.split(";")[1], line.split(";")[8]), float(line.split(";")[5])))
commodity_price_per_unit_type_category = commodity_price_per_unit_type.aggregateByKey((0.0, 0), 
                                                                                      lambda a, b: (a[0] + b, a[1] + 1), 
                                                                                      lambda a, b: (a[0] + b[0], a[1] + b[1]))
average_price_per_unit_type_category = commodity_price_per_unit_type_category.mapValues(lambda v: v[0] / v[1])
print("Average price of commodities per unit type, year, and category in the export flow in Brazil:")

# Collect the results and print each element
result = average_price_per_unit_type_category.collect()
for element in result:
    print(element)

In [None]:
# Problem 5: The maximum, minimum, and mean transaction price per unit type and year

# Process the dataset
transactions = dataset.map(lambda line: ((line.split(";")[7], line.split(";")[1]), float(line.split(";")[5])))

# Calculate maximum, minimum, and mean transaction price per unit type and year
price_stats_per_unit_year = transactions.aggregateByKey((float('-inf'), float('inf'), 0.0, 0), 
                                                        lambda a, b: (max(a[0], b), min(a[1], b), a[2] + b, a[3] + 1),
                                                        lambda a, b: (max(a[0], b[0]), min(a[1], b[1]), a[2] + b[2], a[3] + b[3]))

# Calculate mean transaction price
mean_price_per_unit_year = price_stats_per_unit_year.mapValues(lambda v: v[2] / v[3])

# Iterate over the results and print maximum, minimum, and mean transaction prices per unit type and year

In [None]:
# Problem 6: The country with the largest average commodity price in the Export flow

# Filter the dataset for export flow
export_flow = dataset.filter(lambda line: line.split(";")[4] == "Export")

# Map each line to key-value pairs of (country, price)
country_price = export_flow.map(lambda line: (line.split(";")[0], float(line.split(";")[5])))

# Calculate the sum and count of prices for each country
country_sum_count = country_price.aggregateByKey(
    (0.0, 0),
    lambda acc, price: (acc[0] + price, acc[1] + 1),
    lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1])
)

# Calculate the average price for each country
country_average_price = country_sum_count.mapValues(lambda acc: acc[0] / acc[1])

# Find the country with the largest average price
largest_average_price_country = country_average_price.max(lambda x: x[1])

# Print the result
print("Country with the largest average commodity price in the Export flow:", largest_average_price_country[0], largest_average_price_country[1])

In [None]:
# Problem 7: The most commercialized commodity (summing the quantities) in 2016, per flow type
commodity_quantities_2016 = dataset.filter(lambda line: line.split(";")[1] == "2016") \
    .map(lambda line: ((line.split(";")[3], line.split(";")[4]), float(line.split(";")[8]))) \
    .reduceByKey(lambda a, b: a + b)

most_commercialized_commodity = commodity_quantities_2016.max(key=lambda x: x[1])

most_commercialized_commodity = commodity_quantities_2016.max(key=lambda x: x[1])
print("Most commercialized commodity in 2016:", most_commercialized_commodity[0])