<a href="https://colab.research.google.com/github/alfredo1996/Test-Tecnico-Big-Profiles/blob/main/Exercises.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

#Spark installation

This code is here because is necessary for the installation on Colab.

In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://www-us.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop2.7.tgz
!tar -xf spark-3.1.1-bin-hadoop2.7.tgz
!pip install -q findspark



In [None]:
import time
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop2.7"

import findspark
findspark.init()

import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
spark = SparkSession.builder\
        .master("local")\
        .appName("Colab")\
        .config('spark.ui.port', '4050')\
        .getOrCreate()

spark

#Exercise 1

Execution time on Colab: approximately 1 minute



In [None]:
from pyspark.sql.types import IntegerType
import pyspark.sql.functions as f
from pyspark.sql.functions import row_number, monotonically_increasing_id, expr
from pyspark.sql import Window

#Read file using spark.read_txt
data = spark.read.text("/content/sample_data/exercise_1.txt")
data = data.withColumn("value", data["value"].cast(IntegerType()))

"""
Generator function for the aggregation of each value inside of the partition
"""
def sumInsidePartition(partition):
  i = 0
  for value in partition:
    i = value[0]+i
  yield i

#Call the generator function on each repartition to find the sum
df = data.rdd.repartition(100).mapPartitions(sumInsidePartition).toDF("string")

#Generate the index column for the output, useful also for the cumulative sum
df = df.withColumn(
    "index",
    row_number().over(Window.orderBy(monotonically_increasing_id()))
)

#Generate the cumulative sum ordered by the index
df = df.withColumn('Cumulative Sum', 
    expr('sum(value) over (order by index)'))

#Drop the value column because it's unnecessary
df = df.drop('value').show()

+-----+--------------+
|index|Cumulative Sum|
+-----+--------------+
|    1|      -48479.0|
|    2|      -93898.0|
|    3|     -141829.0|
|    4|     -192245.0|
|    5|     -241620.0|
|    6|     -290349.0|
|    7|     -340332.0|
|    8|     -388562.0|
|    9|     -440468.0|
|   10|     -488113.0|
|   11|     -540788.0|
|   12|     -591341.0|
|   13|     -646322.0|
|   14|     -696615.0|
|   15|     -746863.0|
|   16|     -798172.0|
|   17|     -851906.0|
|   18|     -901855.0|
|   19|     -955747.0|
|   20|    -1005959.0|
+-----+--------------+
only showing top 20 rows



After the first solution, i defined a second solution to improve its performance, because the RDD repartition was too slow in the first solution. The improvement was approximately of 55%.

Execution time on Colab: approximately 27 seconds



In [None]:
from pyspark.sql.types import IntegerType
import pyspark.sql.functions as F
from pyspark.sql.functions import row_number, monotonically_increasing_id, expr
from pyspark.sql import Window

#Read file using spark.read_txt
data = spark.read.text("/content/sample_data/exercise_1.txt")
data = data.withColumn("value", data["value"].cast(IntegerType()))

#Find the cardinality of each window
window_cardinality = data.select("value").count()/100

#Define the window index
df = data.withColumn(
    "index",
    F.ceil(row_number().over(Window.orderBy(monotonically_increasing_id()))/window_cardinality)
)

#Sum over the window index and define 
df = df.groupBy("index").sum("value").withColumnRenamed("sum(value)","TempSum")

#Find the cumulative sum of each window
df = df.withColumn('Cumulative Sum', 
    expr('sum(TempSum) over (order by index)'))

#Drop the value column because it's unnecessary
df = df.drop('TempSum').show()

+-----+--------------+
|index|Cumulative Sum|
+-----+--------------+
|    1|        -50969|
|    2|        -98419|
|    3|       -147135|
|    4|       -195280|
|    5|       -244419|
|    6|       -294926|
|    7|       -344856|
|    8|       -394347|
|    9|       -441334|
|   10|       -492367|
|   11|       -543368|
|   12|       -592760|
|   13|       -641927|
|   14|       -691543|
|   15|       -739688|
|   16|       -787765|
|   17|       -836532|
|   18|       -888248|
|   19|       -939387|
|   20|       -986280|
+-----+--------------+
only showing top 20 rows



As you can see, the results generated by the two jobs are different. This because the dataset's order is not conserved during the computation with RDDs.

#Exercise 2

Execution time on Colab: approximately 25 seconds

In [None]:
from pyspark.sql.types import StructType,StructField, IntegerType
import pyspark.sql.functions as f
from pyspark.sql.functions import row_number, monotonically_increasing_id
from pyspark.sql import Window
from pyspark.sql.functions import expr

#Read file using spark.read_txt
data = spark.read.option("header",True).csv("/content/sample_data/exercise_2.txt")
data = data.withColumn("Value", data["Value"].cast(IntegerType()))

#Find the mean of the all the values
total_mean = data.select(expr("avg(value) as TotalMean")).head()[0]

#Find the aggregated average for each group
data_agg = data.groupBy("ID").avg("Value").orderBy("ID").withColumnRenamed("avg(Value)","Mean")

#For calculus purposes, we round the numbers to avoid underflow problems
data_agg = data_agg.withColumn("Mean",F.round(data_agg["Mean"],4))

#Find the distance and write them inside a new column
data_agg = data_agg.withColumn("Distance", F.round(data_agg["Mean"] - total_mean,4)).show()

+---+-------+--------+
| ID|   Mean|Distance|
+---+-------+--------+
|  A|-0.4889|  0.0109|
|  B|-0.5041| -0.0043|
|  C|-0.4803|  0.0195|
|  D| -0.507| -0.0072|
|  E|-0.4988|   0.001|
|  F|-0.5044| -0.0046|
|  G|-0.4942|  0.0056|
|  H|-0.4994|  4.0E-4|
|  I|-0.5006| -8.0E-4|
|  J|-0.4952|  0.0046|
|  K|-0.5111| -0.0113|
|  L| -0.516| -0.0162|
|  M|-0.5061| -0.0063|
|  N|-0.5098|   -0.01|
|  O|-0.4912|  0.0086|
|  P|-0.4853|  0.0145|
|  Q|-0.5035| -0.0037|
|  R|-0.4883|  0.0115|
|  S|-0.4973|  0.0025|
|  T|-0.5048|  -0.005|
+---+-------+--------+
only showing top 20 rows

