In [1]:
# Exercises with Spark SQL

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

# Create a SparkSession
spark = SparkSession.builder.appName("App").getOrCreate()
spark.sparkContext.setLogLevel("WARN")

25/02/01 14:16:56 WARN Utils: Your hostname, student-VirtualBox resolves to a loopback address: 127.0.1.1; using 10.0.2.15 instead (on interface enp0s3)
25/02/01 14:16:56 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/02/01 14:16:58 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [None]:
# Exercise 1
"""
The command to create a DataFrame from int_list won't work properly because Spark needs an explicit schema o infered to understand how to manipulate the data.
int_list is not a tuple list or else, so it doesn't have a defined schema what can end up leading to errors
"""

int_list = [1,2,3]
intDF = spark.createDataFrame(int_list)

In [3]:
# Exercise 2

schema_fields = ["project_name", "page_title", "num_requests", "content_size"]

inputDf = spark.read.format("csv").option("delimiter", " ").load("pagecounts-20100806-030000").toDF(*schema_fields)

inputDf.printSchema()

inputDf.show(15,truncate=False)

                                                                                

root
 |-- project_name: string (nullable = true)
 |-- page_title: string (nullable = true)
 |-- num_requests: string (nullable = true)
 |-- content_size: string (nullable = true)

+------------+-----------------------------------------------+------------+------------+
|project_name|page_title                                     |num_requests|content_size|
+------------+-----------------------------------------------+------------+------------+
|aa.b        |Main_Page                                      |1           |4881        |
|aa.b        |Special:Contributions/Beau                     |1           |4938        |
|aa.b        |Special:WhatLinksHere/MediaWiki:Makesysopsubmit|1           |4550        |
|aa.b        |User:Ahoerstemeier                             |1           |4388        |
|aa.b        |User:Monobi                                    |1           |5500        |
|aa.d        |Special:Contributions/Les_Meloures             |1           |4949        |
|aa          |%D0%9

In [4]:
# Exercise 3
print("1. Total number of elements")
#SQL
inputDf.createOrReplaceTempView("pagecounts")
total_elements_sql = spark.sql("SELECT COUNT(*) as total_elements FROM pagecounts")
total_elements_sql.show()
#DataFrame API
total_elements_df = inputDf.count()
print(f"Total number of elements with DataFrame API: {total_elements_sql}")

print("2. Complete list of project names (no repetitions)")
#SQL
distinct_projects_sql = spark.sql("SELECT DISTINCT project_name FROM pagecounts")
distinct_projects_sql.show()

#DataFrame API
distinct_projects_df = inputDf.select("project_name").distinct()
distinct_projects_df.show()

print("3. Total content size of project 'en' (Wikipedia in English)")
inputDf = inputDf.withColumn("content_size", inputDf["content_size"].cast("long"))
#SQL
total_content_en_sql = spark.sql("SELECT SUM(content_size) as total_content_size FROM pagecounts WHERE project_name = 'en'")
total_content_en_sql.show()

#DataFrame API
total_content_en_df = inputDf.filter(inputDf["project_name"]=="en").groupBy().sum("content_size")
total_content_en_df.show()

print("4. Top 5 most visited pages of project 'en', and the number of visits for each")
#SQL
top_5_pages_sql = spark.sql("""
    SELECT page_title, num_requests
    FROM pagecounts
    WHERE project_name = 'en'
    ORDER BY num_requests DESC
    LIMIT 5                                   
""")
top_5_pages_sql.show()

#DataFrame API
top_5_pages_df = inputDf.filter(inputDf.project_name == 'en').select("page_title", "num_requests").orderBy(inputDf["num_requests"].desc()).limit(5)
top_5_pages_df.show()

1. Total number of elements


                                                                                

+--------------+
|total_elements|
+--------------+
|       4729148|
+--------------+



                                                                                

Total number of elements with DataFrame API: DataFrame[total_elements: bigint]
2. Complete list of project names (no repetitions)


                                                                                

+------------+
|project_name|
+------------+
|     cbk-zam|
|        co.b|
|        cs.n|
|          en|
|  be-x-old.d|
|          cr|
|        as.d|
|        dv.n|
|        cr.d|
|         crh|
|       ast.q|
|       als.n|
|       ang.q|
|        am.d|
|        af.q|
|        cy.d|
|       an.mw|
|       be.mw|
|        cs.d|
|        cy.s|
+------------+
only showing top 20 rows



                                                                                

+------------+
|project_name|
+------------+
|     cbk-zam|
|        co.b|
|        cs.n|
|          en|
|  be-x-old.d|
|          cr|
|        as.d|
|        dv.n|
|        cr.d|
|         crh|
|       ast.q|
|       als.n|
|       ang.q|
|        am.d|
|        af.q|
|        cy.d|
|       an.mw|
|       be.mw|
|        cs.d|
|        cy.s|
+------------+
only showing top 20 rows

3. Total content size of project 'en' (Wikipedia in English)


                                                                                

+------------------+
|total_content_size|
+------------------+
|  2.99984572954E11|
+------------------+



                                                                                

+-----------------+
|sum(content_size)|
+-----------------+
|     299984572954|
+-----------------+

4. Top 5 most visited pages of project 'en', and the number of visits for each


                                                                                

+-------------------+------------+
|         page_title|num_requests|
+-------------------+------------+
|          A_Prophet|          99|
|American_wire_gauge|          99|
|   Active_Directory|          99|
|         311_(band)|          99|
|     Adenocarcinoma|          99|
+-------------------+------------+





+-------------------+------------+
|         page_title|num_requests|
+-------------------+------------+
|          A_Prophet|          99|
|American_wire_gauge|          99|
|   Active_Directory|          99|
|         311_(band)|          99|
|     Adenocarcinoma|          99|
+-------------------+------------+



                                                                                

In [5]:
# Exercise 4. The application should be delivered in a separated file with .py extension :)
# Solution on Exercise 4.py

In [6]:
# Exercise 5

columns = ["id","name","surname","age","country","local_phone"]
nice_guys = [(1,"Simón","Bolivar",47,"VEN","489 895 965"),
    (2,"Fidel","Castro",90,"CU","956 268 348"),
    (3,"Jose","Doroteo",45,"MEX","985 621 444"),
    (4,"Ernesto","Guevara",39,"AR","895 325 481"),
    (5,"Hugo","Chávez",58,"VE","489 895 965"),
    (6,"Camilo","Cienfuegos",27,"CUB","956 268 348"),
    (7,"Emiliano","Zapata",39,"ME","985 621 444"),
    (8,"Juan Domingo","Perón",78,"ARG","985 621 444"),
  ]



df = spark.createDataFrame(nice_guys).toDF(*columns)

codes={'CUB':'53','CU':'53','ME':'52','MEX':'52','AR':'54','ARG':'54','VE':'58','VEN':'58'}


## defining the initial function of taking in a country and a phone number
def countryPhoneNumberPrefix(country,phone_number):

    return '+'+codes[country]+' '+phone_number


from pyspark.sql.functions import udf,avg

## UDF serializing the function so that it can be used for spark dataframes
intlPhoneFormat= udf(lambda x,y: countryPhoneNumberPrefix(x,y))


## Applying the UDF to the column and creating a new column intl_phone_format
phoneNumbersCleaned=df.withColumn("intl_phone_format",intlPhoneFormat(df.country,df.local_phone))

## Show results set
phoneNumbersCleaned.show()


## The problem is that we first must map all the countries to have a standard name, we will use the 3 letter codes
codesCountry={'CUB':'CUB','CU':'CUB','ME':'MEX','MEX':'MEX','AR':'ARG','ARG':'ARG','VE':'VEN','VEN':'VEN'}

## defining the initial function of taking in a country and a phone number
def reconcileCountryCode(country):

    return codesCountry[country]

## UDF serializing the function so that it can be used for spark dataframes
standardCountryCode= udf(lambda x: reconcileCountryCode(x))


## Apply the serialization to the dataframe and rename the new column country_standard
ages=df.withColumn("country_standard",standardCountryCode(df.country))

## Aggregate with the new dataframe and take the average
ages=ages.groupBy(ages.country_standard).agg(avg("age").alias("avg_age"))


## Show results set
ages.show()



                                                                                

+---+------------+----------+---+-------+-----------+-----------------+
| id|        name|   surname|age|country|local_phone|intl_phone_format|
+---+------------+----------+---+-------+-----------+-----------------+
|  1|       Simón|   Bolivar| 47|    VEN|489 895 965|  +58 489 895 965|
|  2|       Fidel|    Castro| 90|     CU|956 268 348|  +53 956 268 348|
|  3|        Jose|   Doroteo| 45|    MEX|985 621 444|  +52 985 621 444|
|  4|     Ernesto|   Guevara| 39|     AR|895 325 481|  +54 895 325 481|
|  5|        Hugo|    Chávez| 58|     VE|489 895 965|  +58 489 895 965|
|  6|      Camilo|Cienfuegos| 27|    CUB|956 268 348|  +53 956 268 348|
|  7|    Emiliano|    Zapata| 39|     ME|985 621 444|  +52 985 621 444|
|  8|Juan Domingo|     Perón| 78|    ARG|985 621 444|  +54 985 621 444|
+---+------------+----------+---+-------+-----------+-----------------+





+----------------+-------+
|country_standard|avg_age|
+----------------+-------+
|             CUB|   58.5|
|             MEX|   42.0|
|             VEN|   52.5|
|             ARG|   58.5|
+----------------+-------+



                                                                                