In [None]:
# Installing spark
!pip install pyspark

In [None]:
# Importing the dataset
!wget https://jpbarddal.github.io/assets/data/bigdata/transactions_amostra.csv.zip
!unzip transactions_amostra.csv.zip

In [3]:
# Creating the session
from pyspark.sql import SparkSession

spark = SparkSession.builder\
                    .master('local[*]')\
                    .appName('thomas').getOrCreate()
sc = spark.sparkContext

**Loading the file and removing the header**

In [4]:
# Uploading the file
# Removing the header and creating a new updated rdd
rdd_commodities = sc.textFile('transactions_amostra.csv')
header = rdd_commodities.first()
rdd_commodities = rdd_commodities.filter(lambda x: x != header)
rdd_commodities.take(10)

['Belgium;2016;920510;Brass-wind instruments;Export;571297;3966.0;Number of items;4135.0;92_musical_instruments_parts_and_accessories',
 'Guatemala;2008;660200;Walking-sticks, seat-sticks, whips, etc.;Export;35022;5575.0;Number of items;10089.0;66_umbrellas_walking_sticks_seat_sticks_whips_etc',
 'Barbados;2006;220210;Beverage waters, sweetened or flavoured;Re-Export;81058;44458.0;Volume in litres;24113.0;22_beverages_spirits_and_vinegar',
 'Tunisia;2016;780411;Lead foil of a thickness <2mm;Import;4658;121.0;Weight in kilograms;121.0;78_lead_and_articles_thereof',
 'Lithuania;1996;560110;Sanitary towels, diapers and similar articles;Export;76499;5419.0;Weight in kilograms;5419.0;56_wadding_felt_nonwovens_yarns_twine_cordage_etc',
 'Denmark;2011;310100;Animal or vegetable fertilizers, in packs >10 kg;Export;4903675;19028440.0;Weight in kilograms;19028440.0;31_fertilizers',
 'Thailand;1994;920290;String musical instruments nes;Import;2088672;0.0;Number of items;59595.0;92_musical_instrum

**1:** **The number of transactions involving Brazil.**

In [5]:
# Picking only the transactions from Brazil
rdd_brazil = rdd_commodities.filter(lambda x: x.split(';')[0] == 'Brazil')

# Counting the number of transactions
rdd_brazil.count()

27463

**2: The number of transactions per flow type and year.**

In [8]:
# Mapping flow and year as keys
# Mapping 1 as value
rdd_flow_year = rdd_commodities.map(lambda x: ((x.split(';')[1], # year
                                                x.split(';')[4]), 1)) # flow as key and 1 as value

# Reducing the rdd to count the number of transactions
cont = rdd_flow_year.reduceByKey(lambda x,y: x + y)

# Ordering the resulting rdd and displaying the key-value pair
ordering = cont.sortBy(lambda x: x[0], ascending=False)
ordering.take(10)

[(('2016', 'Re-Import'), 939),
 (('2016', 'Re-Export'), 2298),
 (('2016', 'Import'), 22583),
 (('2016', 'Export'), 15260),
 (('2015', 'Re-Import'), 1479),
 (('2015', 'Re-Export'), 2646),
 (('2015', 'Import'), 28834),
 (('2015', 'Export'), 17756),
 (('2014', 'Re-Import'), 1457),
 (('2014', 'Re-Export'), 2806)]

**3: The average of commodity values per year.**

In [15]:
# Mapping year as key
# Mapping price and 1 as values
rdd_average_value = rdd_commodities.map(lambda x: (x.split(';')[1], # year
                                                  (float(x.split(';')[5]), 1))) # price and 1

# Reducing the rdd to sum price and quantity first
sum = rdd_average_value.reduceByKey(lambda x, y: (x[0] + y[0], # summing the price
                                                   x[1] + y[1])) # summing the quantity

# Calculating the average value in the resulting rdd
average = sum.mapValues(lambda x: x[0] / x[1])

# Ordering the resulting rdd and displaying the key-value pair in a ascending manner.
sorted = average.sortBy(lambda x: x[0], ascending=True)
sorted.take(10)

[('1988', 18642970.55638571),
 ('1989', 11263871.329920229),
 ('1990', 11724265.86778952),
 ('1991', 13069223.85515173),
 ('1992', 9402960.863025468),
 ('1993', 10353959.855309162),
 ('1994', 11350325.049077941),
 ('1995', 12286454.103356835),
 ('1996', 11945524.161286663),
 ('1997', 9549881.214776853)]

**4: The average price of commodities per unit type, year, and category in the export flow
in Brazil.**

In [16]:
# Filtering the rdd to pick only the transactions from 2016 made in Brazil.
rdd_average_price = rdd_commodities.filter(lambda x: x.split(';')[0] == 'Brazil' and x.split(';')[4] == 'Export')

# Mapping year, unit type and category as keys
# Mapping price and 1 as values
rdd_average_price = rdd_average_price.map(lambda x: ((x.split(';')[1], # year
                                                      x.split(';')[7], # unit type
                                                      x.split(';')[9]), # category

                                                    (float(x.split(';')[5]), 1))) # price and quantity

# Reducing the rdd to sum price and quantity first
sum = rdd_average_price.reduceByKey(lambda x, y: (x[0] + y[0], # summing the price
                                                   x[1] + y[1])) # summing the quantity

# Calculating the average value in the resulting rdd
average = sum.mapValues(lambda x: x[0] / x[1])

# Ordering the resulting rdd and displaying the key-value pair in a descending manner.
sorted = average.sortBy(lambda x: x[0], ascending=False)
sorted.take(10)

[(('2016',
   'Weight in kilograms',
   '97_works_of_art_collectors_pieces_and_antiques'),
  305909.0),
 (('2016', 'Weight in kilograms', '96_miscellaneous_manufactured_articles'),
  2152944.5),
 (('2016', 'Weight in kilograms', '95_toys_games_sports_requisites'),
  667820.0),
 (('2016',
   'Weight in kilograms',
   '94_furniture_lighting_signs_prefabricated_buildings'),
  5116585.142857143),
 (('2016', 'Weight in kilograms', '91_clocks_and_watches_and_parts_thereof'),
  1672.0),
 (('2016', 'Weight in kilograms', '88_aircraft_spacecraft_and_parts_thereof'),
  169835892.0),
 (('2016', 'Weight in kilograms', '87_vehicles_other_than_railway_tramway'),
  143839036.33333334),
 (('2016',
   'Weight in kilograms',
   '86_railway_tramway_locomotives_rolling_stock_equipmen'),
  4160027.0),
 (('2016', 'Weight in kilograms', '83_miscellaneous_articles_of_base_metal'),
  418903402.6666667),
 (('2016',
   'Weight in kilograms',
   '82_tools_implements_cutlery_etc_of_base_metal'),
  1865728.5)]

**5: The maximum, minimum, and mean transaction price per unit type and year.**

In [17]:
# Mapping year and unit type as keys
# Mapping price and 1 as values
rdd_maximum_minimum = rdd_commodities.map(lambda x: ((x.split(';')[1], # year
                                                      x.split(';')[7]), # unit type

                                                    (float(x.split(';')[5]), # price for max value
                                                    float(x.split(';')[5]), # price for min value
                                                    float(x.split(';')[5]), 1))) # price and quantity

# Reducing the rdd to sum price and quantity first
# Checking what is the max and min value for each key
sum = rdd_maximum_minimum.reduceByKey(lambda x, y: (x[0] if x[0] > y[0] else y[0], # calculating the max value
                                                     x[1] if x[1] < y[1] else y[1], # calculating the min value
                                                     x[2] + y[2], # summing the price
                                                     x[3] + y[3])) # summing the quantity

# Calculating the average value and displaying the other values using the mapValues function
average = sum.mapValues(lambda x: (f"MAX:{x[0]}, MIN: {x[1]}, MÉDIA: {x[2] / x[3]}"))

# Ordering the resulting rdd and displaying the key-value pair in a descending manner.
sorted = average.sortBy(lambda x: x[0], ascending=False)
sorted.take(10)

[(('2016', 'Weight in kilograms'),
  'MAX:54041714444.0, MIN: 1.0, MÉDIA: 29000750.044637196'),
 (('2016', 'Weight in carats'),
  'MAX:9557468.0, MIN: 7957993.0, MÉDIA: 8757730.5'),
 (('2016', 'Volume in litres'),
  'MAX:1547191989.0, MIN: 11.0, MÉDIA: 28192937.36598891'),
 (('2016', 'Volume in cubic meters'),
  'MAX:4052653026.0, MIN: 203.0, MÉDIA: 45403992.22794118'),
 (('2016', 'Thousands of items'),
  'MAX:8554139.0, MIN: 1500.0, MÉDIA: 2027251.0'),
 (('2016', 'Number of pairs'),
  'MAX:1865315579.0, MIN: 20.0, MÉDIA: 39341504.41324201'),
 (('2016', 'Number of packages'),
  'MAX:115285573.0, MIN: 2666.0, MÉDIA: 6871851.043478261'),
 (('2016', 'Number of items'),
  'MAX:19782901523.0, MIN: 1.0, MÉDIA: 35447054.15630021'),
 (('2016', 'Length in metres'),
  'MAX:961206.0, MIN: 19.0, MÉDIA: 74562.9512195122'),
 (('2016', 'Electrical energy in thousands of kilowatt-hours'),
  'MAX:1065282687.0, MIN: 1128262.0, MÉDIA: 233217751.375')]

**6: The country with the largest average commodity price in the Export flow.**

In [18]:
# Picking only the transactions from the Export flow
rdd_largest_price = rdd_commodities.filter(lambda x: x.split(';')[4] == 'Export')

# Mapping country as key
# Mapping price and 1 as values
rdd_largest_price = rdd_largest_price.map(lambda x: (x.split(';')[0], # country
                                                    (float(x.split(';')[5]), 1))) # price and quantity

# Reducing the rdd to sum price and quantity first
sum1 = rdd_largest_price.reduceByKey(lambda x, y: (x[0] + y[0], # summing the price
                                                    x[1] + y[1])) # summing the quantity

# Calculating the average value in the resulting rdd
average = sum1.mapValues(lambda x: (x[0] / x[1]))

# Discovering the wanted country using the max built-in function and displaying the value
largest = average.max(lambda x: x[1])
largest

('Angola', 16369666068.142857)

**7: The most commercialized commodity (summing the quantities) in 2016, per flow
type.**

In [19]:
# Picking only the transactions from 2016
rdd_most_commercialized = rdd_commodities.filter(lambda x: x.split(';')[1] == '2016')

# Mapping flow and commodity description as keys
# Mapping amount as value
rdd_most_commercialized = rdd_most_commercialized.map(lambda x: ((x.split(';')[4], # flow
                                                                  x.split(';')[3]), # commodity description

                                                                 float(x.split(';')[8]))) # amount

# Reducing the rdd to sum the quantities
sum = rdd_most_commercialized.reduceByKey(lambda x, y: x + y)

# Doing a second map
# Flow as key
# Commodity description and the sum of quantities as values
rdd_2 = sum.map(lambda x: (x[0][0], # Flow
                           (x[0][1], # Commodity description
                            x[1]))) # Sum of quantities

# Reducing again to discover the wanted commodities and displaying the result
rdd_2 = rdd_2.reduceByKey(lambda x, y: (x if x[1] > y[1] else y))
rdd_2.take(10)

[('Export',
  ('Iron ore, concentrate, not iron pyrites,unagglomerate', 379546246752.0)),
 ('Import',
  ('Petroleum oils, oils from bituminous minerals, crude', 258289373308.0)),
 ('Re-Export',
  ('Safety razor blades, including blanks in strips', 1261968000.0)),
 ('Re-Import',
  ('Chem wood pulp, soda/sulphate, non-conifer, bleached', 38774873.0))]