In [6]:
from pyspark.sql import SparkSession # Import SparkSession
from pyspark.sql.functions import col # Import Columns to convert ColumnType
from pyspark.sql.functions import desc # Import Descending to sort in descending manner
import time # Import Time function to calculate SparkSession execution duration

#Import the 100k row CSV-file 
spark = SparkSession \
    .builder \
    .appName("Read CSV file and aggregate received by coutry") \
    .getOrCreate()
RDD = spark.read.csv('Spreadsheets/Payments.csv',header=True)

# Convert StringType columns into IntegerType columns to allow for aggregation
RDD = RDD.withColumn("Payment_ID",col("Payment_ID").cast('int'))
RDD = RDD.withColumn("Received",col("Received").cast("int"))
RDD = RDD.withColumn("Payment_date_time",col("Payment_date_time").cast('date'))
RDD = RDD.withColumn("Activity_type",col("Activity_type").cast("int"))

#Copy the SparkSession DataFrame
RDD2 = RDD

In [7]:
# First Data Analysis task: calculate the average received payment for each county
start_time = time.time() # Sets start-time
RDD = RDD.groupBy('Country').avg('Received')
RDD.orderBy(desc('avg(Received)')).show()
print("--- %s seconds ---" % (time.time() - start_time)) # Calculates total duration
print()

+--------------------+------------------+
|             Country|     avg(Received)|
+--------------------+------------------+
|             Grenada|5409.6322314049585|
|          San Marino| 5409.388888888889|
|              Tuvalu| 5397.133709981167|
|              Uganda| 5390.805168986083|
|              Mexico|  5388.51593625498|
|             Myanmar| 5383.053130929791|
|             Belarus|  5380.60594059406|
|Saint Vincent and...|  5367.74269005848|
|         Philippines| 5357.794059405941|
|             Eritrea| 5357.704166666666|
|               Tonga| 5344.640977443609|
|      United Kingdom| 5341.944329896907|
|              Brunei|5335.6418511066395|
|             Finland| 5331.593810444874|
|                Iran|  5314.49387755102|
|             Tunisia| 5309.850661625709|
|            Ethiopia|          5306.792|
|Saint Kitts and N...| 5302.995824634656|
|              Rwanda| 5302.284294234592|
|          Luxembourg|5289.8292181069955|
+--------------------+------------

In [8]:
# Second Data Analysis task: Count N payments for each county
start_time = time.time() # Sets start-time
RDD2 = RDD2.groupBy('Country').count()
RDD2.orderBy(desc('count')).show() # Calculates total duration
print("--- %s seconds ---" % (time.time() - start_time))

+-----------+-----+
|    Country|count|
+-----------+-----+
|    Ecuador|  569|
|    Comoros|  566|
|     Latvia|  560|
|   Slovakia|  558|
|Saint Lucia|  557|
|     Turkey|  555|
|   Dominica|  554|
|      Nauru|  551|
|     Angola|  550|
|    Andorra|  549|
|   Pakistan|  548|
|     Greece|  548|
|       Mali|  548|
|     France|  548|
|Afghanistan|  546|
|    Vanuatu|  542|
|      Qatar|  541|
|  Macedonia|  540|
|   Barbados|  539|
|   Cambodia|  539|
+-----------+-----+
only showing top 20 rows

--- 0.3567471504211426 seconds ---
