# Spark SQL and DataFrames

## Configuring `JAVA_HOME`

**Optional configuration (in case you need a different JVM version)**

In [1]:
import os

# En nuestro ordenador personal, si no esta definida la variable JAVA_HOME, deberemos indicarla
# os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-17-openjdk-amd64"

# En los laboratorios docentes, sera necesario utilizar la siguiente
# os.environ["JAVA_HOME"] = "/usr/"

os.environ["JAVA_HOME"]

'/usr/lib/jvm/java-17-openjdk-amd64/'

**Import libraries, check versions**

In [2]:
import pyspark
print(pyspark.__version__)

3.5.3


In [3]:
%%bash
java -version

openjdk version "17.0.13" 2024-10-15
OpenJDK Runtime Environment (build 17.0.13+11-Ubuntu-2ubuntu122.04)
OpenJDK 64-Bit Server VM (build 17.0.13+11-Ubuntu-2ubuntu122.04, mixed mode, sharing)


## Opening SparkSession

In [4]:
from pyspark.sql import SparkSession

spark = (SparkSession
         .builder
         .master("local[*]")
         #.config("spark.driver.cores", 1)
         .appName("101 Spark DataFrames")
         .getOrCreate() )

sc = spark.sparkContext
spark

24/11/13 18:28:01 WARN Utils: Your hostname, maes resolves to a loopback address: 127.0.1.1; using 10.0.78.133 instead (on interface wlp0s20f3)
24/11/13 18:28:01 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/11/13 18:28:03 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


# Data Frames

## Creating DataFrames interactively

In [5]:
# Creating a DataFrame
rdd = sc.parallelize([("Javier", 35), ("Susana", 19), ("Mateo", 25)]) # RDD (low level)
personas = spark.createDataFrame(rdd, ["name", "age"]) # Data Frame (Spark Structured Data, high level)
print(type(personas))
personas.collect()

                                                                                

<class 'pyspark.sql.dataframe.DataFrame'>


                                                                                

[Row(name='Javier', age=35),
 Row(name='Susana', age=19),
 Row(name='Mateo', age=25)]

In [6]:
personas.show()

+------+---+
|  name|age|
+------+---+
|Javier| 35|
|Susana| 19|
| Mateo| 25|
+------+---+



In [7]:
personas.dtypes

[('name', 'string'), ('age', 'bigint')]

## Data Frames from CSV files

In [8]:
%%bash
head -n 10 data/flight-data/2015-summary.csv

DEST_COUNTRY_NAME,ORIGIN_COUNTRY_NAME,count
United States,Romania,15
United States,Croatia,1
United States,Ireland,344
Egypt,United States,15
United States,India,62
United States,Singapore,1
United States,Grenada,62
Costa Rica,United States,588
Senegal,United States,40


In [9]:
flight_data_2015 = (spark
                    .read
                    .option("inferSchema", "true")
                    .option("header", "true")
                    .csv("data/flight-data/2015-summary.csv")
                   )

In [10]:
# Take any 3 elements from dataset
flight_data_2015.take(3)

[Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Romania', count=15),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Croatia', count=1),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Ireland', count=344)]

In [11]:
flight_data_2015.show(10)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|   15|
|    United States|            Croatia|    1|
|    United States|            Ireland|  344|
|            Egypt|      United States|   15|
|    United States|              India|   62|
|    United States|          Singapore|    1|
|    United States|            Grenada|   62|
|       Costa Rica|      United States|  588|
|          Senegal|      United States|   40|
|          Moldova|      United States|    1|
+-----------------+-------------------+-----+
only showing top 10 rows



In [12]:
(flight_data_2015
 .filter("count > 5")
 .sort("count")
 .show()
)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|           Paraguay|    6|
|         Pakistan|      United States|   12|
|    United States|              Egypt|   12|
|    United States|           Ethiopia|   12|
|    United States|           Pakistan|   12|
|    United States|     Czech Republic|   12|
|    United States|             Angola|   13|
|    United States|            Ukraine|   13|
|         Ethiopia|      United States|   13|
|   Czech Republic|      United States|   13|
|    United States|       Cook Islands|   13|
|    United States|            Uruguay|   13|
|    United States|            Bolivia|   13|
|     Cook Islands|      United States|   13|
|    United States|         Cape Verde|   14|
|          Ukraine|      United States|   14|
|          Romania|      United States|   14|
|    United States|            Romania|   15|
|            Egypt|      United St

In [13]:
# Reduce the number of output partitions from default 200 to 5
spark.conf.set("spark.sql.shuffle.partitions", 5)
flight_data_2015.sort("count").take(2)

[Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Singapore', count=1),
 Row(DEST_COUNTRY_NAME='Moldova', ORIGIN_COUNTRY_NAME='United States', count=1)]

### Get number of flights per destination

In [14]:
count_per_dest = (flight_data_2015
                  .groupBy("DEST_COUNTRY_NAME")
                  .sum()
                 )
count_per_dest.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[DEST_COUNTRY_NAME#30], functions=[sum(count#32)])
   +- Exchange hashpartitioning(DEST_COUNTRY_NAME#30, 5), ENSURE_REQUIREMENTS, [plan_id=85]
      +- HashAggregate(keys=[DEST_COUNTRY_NAME#30], functions=[partial_sum(count#32)])
         +- FileScan csv [DEST_COUNTRY_NAME#30,count#32] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/home/maes/Documents/Docencia/SDPD/Codigo/spark/data/flight-data/..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string,count:int>




In [15]:
count_per_dest.show(50)

+--------------------+----------+
|   DEST_COUNTRY_NAME|sum(count)|
+--------------------+----------+
|             Moldova|         1|
|             Bolivia|        30|
|             Algeria|         4|
|Turks and Caicos ...|       230|
|            Pakistan|        12|
|    Marshall Islands|        42|
|            Suriname|         1|
|              Panama|       510|
|         New Zealand|       111|
|             Liberia|         2|
|             Ireland|       335|
|              Zambia|         1|
|            Malaysia|         2|
|               Japan|      1548|
|    French Polynesia|        43|
|           Singapore|         3|
|             Denmark|       153|
|               Spain|       420|
|             Bermuda|       183|
|            Kiribati|        26|
|      Czech Republic|        13|
|             Belgium|       259|
|          Cape Verde|        20|
|         Saint Lucia|       123|
|             Tunisia|         3|
|               Niger|         2|
|      United 

In [16]:
from pyspark.sql.functions import desc

# Now sort in desc order
(flight_data_2015
 .groupBy("DEST_COUNTRY_NAME")
 .sum("count")
 .withColumnRenamed("sum(count)", "dest_total")
 .sort(desc("dest_total"))
 .limit(15)
 .show()
)

+------------------+----------+
| DEST_COUNTRY_NAME|dest_total|
+------------------+----------+
|     United States|    411352|
|            Canada|      8399|
|            Mexico|      7140|
|    United Kingdom|      2025|
|             Japan|      1548|
|           Germany|      1468|
|Dominican Republic|      1353|
|       South Korea|      1048|
|       The Bahamas|       955|
|            France|       935|
|          Colombia|       873|
|            Brazil|       853|
|       Netherlands|       776|
|             China|       772|
|           Jamaica|       666|
+------------------+----------+



## Data Frames from JSON files

In [17]:
# Creation 
tweets = spark.read.json("data/15m-sample.json")
tweets.take(1)

                                                                                24/11/13 18:28:36 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

[Row(
 activities=None, 
 coordinates=None, 
 created_at=None, 
 favorited=None, 
 geo=None, 
 id=None, 
 id_str=None, 
 in_reply_to_screen_name=None, 
 in_reply_to_status_id=None, 
 in_reply_to_status_id_str=None, 
 in_reply_to_user_id_str=None, 
 place=None, 
 possibly_sensitive=None, 
 retweet_count=None, 
 retweeted=None, 
 source=None, 
 truncated=None, 
 user=None, activities=Row(
 favoriters=None, 
 favoriters_count=None, 
 repliers=None, 
 repliers_count=None, 
 retweeters=None, 
 retweeters_count=None, favoriters=[], favoriters_
 count=None, favoriters_count='0', repliers=[], repliers_
 count=None, repliers_count='0', retweeters=[], retweeters_
 count=None, retweeters_count='0'), contributors=None, coordinates=None, created_
 at=None, created_at='Mon Oct 10 14:46:18 +0000 2011', entities=Row(hashtags=[Row(
 indices=None, indices=[64, 80], text='occupyamsterdam'), Row(
 indices=None, indices=[81, 98], text='occupywallstreet'), Row(
 indices=None, indices=[99, 109], text='99perc

In [18]:
tweets.count()

                                                                                

8028

## Analyze data with Spark SQL API

In [19]:
from pyspark.sql.types import *
from pyspark.sql import functions

students = spark.read.json("data/estudiantes.json",
                                   StructType([StructField("_id",StringType(),False),
                                               StructField("name",StringType(),False),
                                               StructField("surname",StringType(),False),
                                               StructField("age",ByteType(),False),
                                               StructField("email",StringType(),False),
                                               StructField("grade",FloatType(),False)])
                          )
type(students)


pyspark.sql.dataframe.DataFrame

In [20]:
students.count()

10000

In [21]:
summary = students.agg(
    functions.min(students.grade),
    functions.max(students.grade),
    functions.min(students.surname),
    functions.min(students.name))
#summary.collect()
summary.show()

[Stage 26:>                                                         (0 + 1) / 1]

+----------+----------+------------+---------+
|min(grade)|max(grade)|min(surname)|min(name)|
+----------+----------+------------+---------+
|      5.03|      10.0|   Abad Abad|  Agustin|
+----------+----------+------------+---------+



                                                                                

In [22]:
# Correlations)
print("Correlation between 'age' and 'grade': ", round(students.corr("age", "grade"),6))

[Stage 29:>                                                         (0 + 1) / 1]

Correlation between 'age' and 'grade':  0.005887


                                                                                

In [23]:
# Contingency table edad | nota
students.crosstab("age", "grade").collect()

                                                                                

[Row(age_grade='19', 10.0=0, 5.03=0, 5.04=0, 5.05=0, 5.08=0, 5.09=0, 5.13=0, 5.16=0, 5.17=1, 5.18=0, 5.2=0, 5.24=0, 5.25=0, 5.29=1, 5.3=0, 5.31=0, 5.32=0, 5.35=0, 5.36=0, 5.37=0, 5.38=0, 5.4=0, 5.45=0, 5.46=2, 5.47=1, 5.48=0, 5.49=1, 5.5=1, 5.51=1, 5.53=0, 5.54=0, 5.55=0, 5.56=0, 5.57=0, 5.58=0, 5.59=0, 5.6=0, 5.62=0, 5.64=0, 5.65=0, 5.66=0, 5.67=0, 5.68=0, 5.69=1, 5.7=0, 5.71=1, 5.72=1, 5.73=1, 5.74=0, 5.75=1, 5.76=0, 5.77=0, 5.78=0, 5.79=0, 5.8=0, 5.81=1, 5.82=1, 5.83=0, 5.84=0, 5.85=1, 5.86=1, 5.87=1, 5.88=3, 5.89=1, 5.9=1, 5.91=0, 5.92=0, 5.93=0, 5.94=1, 5.95=2, 5.96=0, 5.97=1, 5.98=0, 5.99=3, 6.0=1, 6.01=0, 6.02=1, 6.03=1, 6.04=0, 6.05=0, 6.06=0, 6.07=1, 6.08=0, 6.09=1, 6.1=0, 6.11=1, 6.12=2, 6.13=1, 6.14=1, 6.15=3, 6.16=1, 6.17=1, 6.18=1, 6.19=0, 6.2=2, 6.21=1, 6.22=4, 6.23=1, 6.24=0, 6.25=0, 6.26=0, 6.27=3, 6.28=0, 6.29=0, 6.3=1, 6.31=1, 6.32=2, 6.33=1, 6.34=1, 6.35=1, 6.36=0, 6.37=1, 6.38=4, 6.39=0, 6.4=1, 6.41=3, 6.42=3, 6.43=0, 6.44=4, 6.45=1, 6.46=1, 6.47=3, 6.48=1, 6.49=0, 

### Inspección de typos de columnas

In [24]:
students.dtypes

[('_id', 'string'),
 ('name', 'string'),
 ('surname', 'string'),
 ('age', 'tinyint'),
 ('email', 'string'),
 ('grade', 'float')]

In [25]:
students.printSchema()

root
 |-- _id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- surname: string (nullable = true)
 |-- age: byte (nullable = true)
 |-- email: string (nullable = true)
 |-- grade: float (nullable = true)



In [26]:
(students.select('name', 'surname', 'age')
         .filter((students.age > 18) & (students.age < 20))
         .show())

+---------+-----------------+---+
|     name|          surname|age|
+---------+-----------------+---+
|    David|   Miranda Grande| 19|
|   Marcos|  Sarabia Gisbert| 19|
|   Sergio|    Sanchez Agudo| 19|
|  Natalia|  Moliner Sanchez| 19|
|     Lope|    Cuesta Blasco| 19|
|    Oriol|   Pascual Guerra| 19|
|    Berta|   Sedano Pedrosa| 19|
|   Daniel| Gonzalez Sanchez| 19|
|     Jose|    Lopez Antunez| 19|
|    Pablo|  Bermejo Antunez| 19|
|Valentina|Corominas Antunez| 19|
|    Pedro|   Cuervo Segovia| 19|
|Alejandro|       Tous Bonet| 19|
|     Sara|  Gonzalez Sastre| 19|
|Guadalupe|      Seco Cuesta| 19|
|  Enrique|  Bermejo Peinado| 19|
|   Felipe|  Segovia Segarra| 19|
|    Juana|   Sanchez Abadia| 19|
|   Sergio|    Bermejo Gomez| 19|
| Cristian|    Gomez Bermejo| 19|
+---------+-----------------+---+
only showing top 20 rows



In [27]:
(students.groupBy('name')
         .agg(functions.avg('age'),
              functions.min('grade'))
         .orderBy('name')
         .show())
         #.explain())

+---------+------------------+----------+
|     name|          avg(age)|min(grade)|
+---------+------------------+----------+
|  Agustin|             21.68|      5.59|
|Alejandro|22.395522388059703|      5.62|
|   Alonso|22.486238532110093|      5.57|
|   Alvaro|22.385245901639344|      5.93|
|  Antonia|21.783783783783782|      5.37|
|  Antonio|21.663716814159294|      5.75|
|    Berta|21.823529411764707|      5.76|
|   Camila|22.041666666666668|      5.16|
|  Claudia|22.085271317829456|      5.05|
| Cristian|21.702479338842974|      5.75|
|   Daniel|21.975806451612904|      5.66|
|    David|22.296187683284458|      5.35|
|    Diego|22.057142857142857|      5.17|
|  Dolores|21.982905982905983|      5.85|
|    Elena|22.023622047244096|      5.03|
|   Emilia|22.495934959349594|       5.2|
|     Emma|22.466666666666665|      5.17|
|  Enrique| 22.37735849056604|      5.46|
|  Esteban| 22.14503816793893|      5.54|
|   Felipe|22.048458149779737|      5.91|
+---------+------------------+----

In [28]:
(students.select("surname", "grade")
         .withColumn('Excellent', students.grade > 9.5)
         .filter('Excellent == True')
         .orderBy('grade', ascending=False)
         .show())

+-----------------+-----+---------+
|          surname|grade|Excellent|
+-----------------+-----+---------+
|    Segovia Lopez| 10.0|     true|
|  Sanchez Gisbert| 9.99|     true|
|    Gomez Segarra| 9.99|     true|
| Pascual Gonzalez| 9.98|     true|
|    Comas Segovia| 9.98|     true|
|  Bermejo Gisbert| 9.98|     true|
|  Sanchez Peinado| 9.97|     true|
|  Sedano Martinez| 9.97|     true|
|Sarabia PeralSanz| 9.97|     true|
|     Guerra Lopez| 9.97|     true|
|   Parada Sanchez| 9.97|     true|
| PeralSanz Grande| 9.96|     true|
|     Aznar Abadia| 9.96|     true|
|   Gomez Martinez| 9.96|     true|
| Bermejo Belmonte| 9.95|     true|
|      Torres Peña| 9.95|     true|
|    Grande Crespo| 9.95|     true|
|   Lopez Gonzalez| 9.95|     true|
|     Rajoy Garcia| 9.94|     true|
| Gonzalez Gisbert| 9.94|     true|
+-----------------+-----+---------+
only showing top 20 rows



# SQL queries

In [29]:
# Example with excerpt of Twitter data
# tweets = spark.read.json("../data/15m-sample.json")
tweets.createOrReplaceTempView("tweets")

# A simple query
rset = spark.sql("SELECT id, created_at, text FROM tweets LIMIT 10")
rset.show()

+------------------+--------------------+--------------------+
|                id|          created_at|                text|
+------------------+--------------------+--------------------+
|123409021318336512|Mon Oct 10 14:46:...|Occupy wall Stree...|
|123431682190737408|Mon Oct 10 16:16:...|Parce que si on c...|
|123423114318196738|Mon Oct 10 15:42:...|RT @acampadasol: ...|
|123398762411474944|Mon Oct 10 14:05:...|RT @ciudadanoNick...|
|123429357799747585|Mon Oct 10 16:07:...|RT @democraciarea...|
|123438512233119744|Mon Oct 10 16:43:...|RT @democraciarea...|
|123356436901277696|Mon Oct 10 11:17:...|RT @drymty: @Cumb...|
|123380990730182657|Mon Oct 10 12:54:...|RT @madari59: #15...|
|123635477545885696|Tue Oct 11 05:46:...|"#15oct Video: Ka...|
|123631273108455424|Tue Oct 11 05:29:...|action in #OCCUPY...|
+------------------+--------------------+--------------------+



In [30]:
# Spark inspects and deduces the schema for the JSON file
rset = spark.sql("""
SELECT user.id, user.name, user.location
FROM tweets
WHERE user.location == 'Barcelona'
LIMIT 20""")
rset.show()

+---------+-------------------+---------+
|       id|               name| location|
+---------+-------------------+---------+
|125299040|       rosa ciurana|Barcelona|
|366671456|   Josefina arribas|Barcelona|
|300712661|       Pedro Paramo|Barcelona|
|307535668|Assemblea St Antoni|Barcelona|
|307535668|Assemblea St Antoni|Barcelona|
|307535668|Assemblea St Antoni|Barcelona|
|304885181|         buscamanis|Barcelona|
|119814153|    Sergio Carrillo|Barcelona|
|203220213|        Miquel Gené|Barcelona|
|203220213|        Miquel Gené|Barcelona|
| 19876616|        João França|Barcelona|
|203220213|        Miquel Gené|Barcelona|
|203220213|        Miquel Gené|Barcelona|
| 19876616|        João França|Barcelona|
|310277854|        AsbleaRaval|Barcelona|
|145391848|    Eduardo Rosales|Barcelona|
| 99490181|        Josep Cerdà|Barcelona|
| 52760260|             Sergio|Barcelona|
|347267438|         Chiara_bcn|Barcelona|
| 18897024|      canalsolidari|Barcelona|
+---------+-------------------+---

In [32]:
# Paramos sparkSession
spark.stop()