<a href="https://colab.research.google.com/github/PaulSerin/Big-Data-Framework/blob/main/BDF_03_Working_with_DataFrames.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

#00 - Configuration of Apache Spark on Collaboratory


###Installing Java, Spark, and Findspark


---


This code installs Apache Spark 2.4.4, Java 8, and [Findspark](https://github.com/minrk/findspark), a library that makes it easy for Python to find Spark.

In [1]:
import os

os.environ["SPARK_VERSION"] = "spark-3.5.3"
!apt-get update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget  http://apache.osuosl.org/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop3.tgz
!tar xf $SPARK_VERSION-bin-hadoop3.tgz
!echo $SPARK_VERSION-bin-hadoop3.tgz
!rm $SPARK_VERSION-bin-hadoop3.tgz
!pip install -q findspark

0% [Working]            Get:1 http://security.ubuntu.com/ubuntu jammy-security InRelease [129 kB]
Get:2 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease [3,626 B]
Get:3 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  InRelease [1,581 B]
Hit:4 http://archive.ubuntu.com/ubuntu jammy InRelease
Get:5 https://r2u.stat.illinois.edu/ubuntu jammy InRelease [6,555 B]
Get:6 http://archive.ubuntu.com/ubuntu jammy-updates InRelease [128 kB]
Get:7 http://archive.ubuntu.com/ubuntu jammy-backports InRelease [127 kB]
Hit:8 https://ppa.launchpadcontent.net/deadsnakes/ppa/ubuntu jammy InRelease
Hit:9 https://ppa.launchpadcontent.net/graphics-drivers/ppa/ubuntu jammy InRelease
Hit:10 https://ppa.launchpadcontent.net/ubuntugis/ppa/ubuntu jammy InRelease
Get:11 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  Packages [1,172 kB]
Get:12 https://r2u.stat.illinois.edu/ubuntu jammy/main all Packages [8,501 kB]
Get:13 https://r2u.stat

### Set Environment Variables
Set the locations where Spark and Java are installed.

In [2]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark/"
os.environ["DRIVE_DATA"] = "/content/gdrive/My Drive/Big Data Framework/data/"

!rm /content/spark
!ln -s /content/$SPARK_VERSION-bin-hadoop3 /content/spark
!export PATH=$PATH:$SPARK_HOME/bin:$SPARK_HOME/sbin
!echo $SPARK_HOME
!env |grep  "DRIVE_DATA"

rm: cannot remove '/content/spark': No such file or directory
/content/spark/
DRIVE_DATA=/content/gdrive/My Drive/Big Data Framework/data/


### Start a SparkSession
This will start a local Spark session.

In [3]:
!python -V

import findspark
findspark.init()

from pyspark import SparkContext
sc = SparkContext.getOrCreate()

# Example: shows the PySpark version
print("PySpark version {0}".format(sc.version))

# Example: parallelise an array and show the 2 first elements
sc.parallelize([2, 3, 4, 5, 6]).cache().take(2)

Python 3.10.12
PySpark version 3.5.3


[2, 3]

In [4]:
from pyspark.sql import SparkSession
# We create a SparkSession object (or we retrieve it if it is already created)
spark = SparkSession \
.builder \
.appName("My application") \
.config("spark.some.config.option", "some-value") \
.master("local[4]") \
.getOrCreate()
# We get the SparkContext
sc = spark.sparkContext

In [5]:
# Mount Google Drive
from google.colab import drive
drive.mount('/content/gdrive')

Mounted at /content/gdrive




---


# 03 - Working with DataFrames

## Introduction to DataFrames
We will see:

  - How to create a DataFrame
  - Basic operations on DataFrames
      - Show rows
      - Select columns
      - Rename, add and delete columns
      - Delete null values and duplicated rows
      - Replace values
  - Save DataFrames in different formats

## Creating DataFrames
A DataFrame can be created in different ways:

  - From a data sequence
  - From Row-type objects
  - From an RDD or a DataSet
  - Reading data from a file
      - Like in Hadoop, Spark supports different filesystems: local, HDFS, Amazon S3
          - By and large, it supports any data source that can be read with Hadoop
      - Spark can access different types of files: plain text, CSV, JSON, [Parquet](https://parquet.apache.org/), [ORC](https://orc.apache.org/), Sequence, etc
        -   It also supports compressed files
  - Accessing relational databases or noSQL databases
    -   MySQL, Postgres, etc. using JDBC/ODBC
    -  Hive, HBase, Cassandra, MongoDB, AWS Redshift, etc.
    
Some examples on how to create DataFrames below:

### From a sequence or a list of data

In [6]:
from pyspark.sql.functions import col,expr
# Creating a DataFrame from a range and adding two columns
df = spark.range(1,7,2).toDF("n")
df.show()
df.withColumn("n1", col("n")+1).withColumn("n2", expr("2*n")).show()
# Note that in the call to 'expr' we can include SQL code

+---+
|  n|
+---+
|  1|
|  3|
|  5|
+---+

+---+---+---+
|  n| n1| n2|
+---+---+---+
|  1|  2|  2|
|  3|  4|  6|
|  5|  6| 10|
+---+---+---+



In [8]:
# DataFrame from a list of tuples
l = [("Eric", 5.1, "Pass"),\
     ("John", 4.0, "Fail"),\
     ("Manuel", None, None)]
dfMarks = spark.createDataFrame(l, schema=["Name", "mark", "result"])
dfMarks.show()
dfMarks.printSchema()

+------+----+------+
|  Name|mark|result|
+------+----+------+
|  Eric| 5.1|  Pass|
|  John| 4.0|  Fail|
|Manuel|NULL|  NULL|
+------+----+------+

root
 |-- Name: string (nullable = true)
 |-- mark: double (nullable = true)
 |-- result: string (nullable = true)



### Creating DataFrames with a schema


When creating a DataFrame, it is a good idea to specify its schema:

  - The schema defines the names and data types of each column
  - It uses an object of type ``StructType`` to define the name and type of the columns
  - The data types used by Spark are defined in:
      - For PySpark: https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql.html#data-types
      - For Scala: https://spark.apache.org/docs/latest/api/scala/#org.apache.spark.sql.types.package


In [9]:
from pyspark.sql.types import StructField, StructType, FloatType, StringType
from pyspark.sql import Row
# Define the DataFrame schema
schemaMarks = StructType([
    StructField("Name", StringType(), False),
    StructField("mark", FloatType(), True),
    StructField("result", StringType(), True)
    ])

# Create the DataFrame from a list of Row objects
rows = [Row("Eric", 5.1, "Pass"),\
         Row("John", 4.0, "Fail"),\
         Row("Manuel", None, None)]

dfMarks = spark.createDataFrame(rows, schema=schemaMarks)
dfMarks.show()
dfMarks.printSchema()

+------+----+------+
|  Name|mark|result|
+------+----+------+
|  Eric| 5.1|  Pass|
|  John| 4.0|  Fail|
|Manuel|NULL|  NULL|
+------+----+------+

root
 |-- Name: string (nullable = false)
 |-- mark: float (nullable = true)
 |-- result: string (nullable = true)



### Creating DataFrames from a text file


Each file line is stored as a row

### Creating DataFrames from a CSV file (revisited)

As an example, we are going to use a file with questions and replies from Stack Exchange (https://stackexchange.com/) in Italian.
It is a CVS file, with the following 13 fields:

  0. ``nComs`` - Number of comments of the question of the reply
  2. ``lastActivity`` - Date and hour of the last modification
  3. ``userId`` - Owner's ID
  4. ``body`` - Text of the question or reply
  5. ``score`` - Score of the question or reply based on positive and negative votes
  6. ``creationDate`` - Creation date and hour
  6. ``numViewed`` - Number of times viewed (null if the question has never been viewed)
  7. ``title`` - Question title (null if it is a reply)
  8. ``tags`` - Tags assigned to the question (null if there are no tags assigned)
  9. ``nAnswers`` - Number of replies related to the question (null if there are not any)
  10. ``acceptedAnswerId`` - The ID of the accepted answer (null if the question has no accepted answer)
  11. ``postType`` - Type of message: 1 question, 2 reply
  12. ``id`` - Unique message identifier

Fields are separated by the "~" symbol

#### a) Read the file and infer the schema

In [10]:
dfSEInferred = spark.read.format("csv")\
                    .option("mode", "FAILFAST")\
                    .option("sep", "~")\
                    .option("inferSchema", "true")\
                    .option("header", "false")\
                    .option("nullValue", "null")\
                    .option("compression", "bzip2")\
                    .load(os.environ["DRIVE_DATA"] +"italianPosts.csv.bz2")

Some options:

1. ``mode``: specifies what to do when it finds corrupted entries
    - ``PERMISSIVE``: sets all fields to null when a corrupted entry is found (default value)
    - ``DROPMALFORMED``: deletes the rows with corrupted entries
    - ``FAILFAST``: returns an error when a corrupted entry is found
2. ``sep``:  field delimiter (by default ",")
3. ``inferSchema``: whether column types must be inferred (by default "false")
4. ``header``: if "true", the first line is taken as the header (by default "false")
5. ``nullValue``: character or string thar represents a NULL in the file  (by default "")
6. ``compression``: compression type (by default "none")
  
These options are similar for other types of files.

In [11]:
# Show 5 rows
dfSEInferred.show(5)

+---+--------------------+---+--------------------+---+--------------------+----+--------------------+--------------------+----+----+----+----+
|_c0|                 _c1|_c2|                 _c3|_c4|                 _c5| _c6|                 _c7|                 _c8| _c9|_c10|_c11|_c12|
+---+--------------------+---+--------------------+---+--------------------+----+--------------------+--------------------+----+----+----+----+
|  4|2013-11-11 18:21:...| 17|&lt;p&gt;The infi...| 23|2013-11-10 19:37:...|NULL|                NULL|                NULL|NULL|NULL|   2|1165|
|  5|2013-11-10 20:31:...| 12|&lt;p&gt;Come cre...|  1|2013-11-10 19:44:...|  61|Cosa sapreste dir...| &lt;word-choice&gt;|   1|NULL|   1|1166|
|  2|2013-11-10 20:31:...| 17|&lt;p&gt;Il verbo...|  5|2013-11-10 19:58:...|NULL|                NULL|                NULL|NULL|NULL|   2|1167|
|  1|2014-07-25 13:15:...|154|&lt;p&gt;As part ...| 11|2013-11-10 22:03:...| 187|Ironic constructi...|&lt;english-compa...|   4|1170|   

In [12]:
# Find out how the schema was inferred
dfSEInferred.schema

StructType([StructField('_c0', IntegerType(), True), StructField('_c1', TimestampType(), True), StructField('_c2', IntegerType(), True), StructField('_c3', StringType(), True), StructField('_c4', IntegerType(), True), StructField('_c5', TimestampType(), True), StructField('_c6', IntegerType(), True), StructField('_c7', StringType(), True), StructField('_c8', StringType(), True), StructField('_c9', IntegerType(), True), StructField('_c10', IntegerType(), True), StructField('_c11', IntegerType(), True), StructField('_c12', IntegerType(), True)])

In [13]:
# Another way of getting the same result
dfSEInferred.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- _c1: timestamp (nullable = true)
 |-- _c2: integer (nullable = true)
 |-- _c3: string (nullable = true)
 |-- _c4: integer (nullable = true)
 |-- _c5: timestamp (nullable = true)
 |-- _c6: integer (nullable = true)
 |-- _c7: string (nullable = true)
 |-- _c8: string (nullable = true)
 |-- _c9: integer (nullable = true)
 |-- _c10: integer (nullable = true)
 |-- _c11: integer (nullable = true)
 |-- _c12: integer (nullable = true)



#### b) Read the file and specify the schema

In [16]:
from pyspark.sql.types import *
# We first create a list with each column header
# Note: avoid spaces and non-ascii characters on column names
header = (["nComs", "lastActivity", "userId",
            "body", "score", "creationDate", "numViewed", "title",
            "tags", "nAnswers", "acceptedAnswerId", "postType", "id"])

# Define the schema for the elements of the table
# StructType -> Defines a schema for the DF from a list of StructFields
# StructField -> Defines the name and type of each column, and whether it is nullable or not (True field)
dfSE_Schema = StructType([
  StructField(header[0], IntegerType(), True),
  StructField(header[1], TimestampType(), True),
  StructField(header[2], LongType(), True),
  StructField(header[3], StringType(), True),
  StructField(header[4], IntegerType(), True),
  StructField(header[5], TimestampType(), True),
  StructField(header[6], IntegerType(), True),
  StructField(header[7], StringType(), True),
  StructField(header[8], StringType(), True),
  StructField(header[9], IntegerType(), True),
  StructField(header[10], LongType(), True),
  StructField(header[11], ByteType(), True),
  StructField(header[12], LongType(), True)
  ])

dfSE = spark.read.format("csv")\
                    .option("mode", "FAILFAST")\
                    .option("sep", "~")\
                    .option("inferSchema", "false")\
                    .option("header", "false")\
                    .option("nullValue", "null")\
                    .option("compression", "bzip2")\
                    .schema(dfSE_Schema)\
                    .load(os.environ["DRIVE_DATA"] +"italianPosts.csv.bz2")
dfSE.cache()
dfSE.show()

+-----+--------------------+------+--------------------+-----+--------------------+---------+--------------------+--------------------+--------+----------------+--------+----+
|nComs|        lastActivity|userId|                body|score|        creationDate|numViewed|               title|                tags|nAnswers|acceptedAnswerId|postType|  id|
+-----+--------------------+------+--------------------+-----+--------------------+---------+--------------------+--------------------+--------+----------------+--------+----+
|    4|2013-11-11 18:21:...|    17|&lt;p&gt;The infi...|   23|2013-11-10 19:37:...|     NULL|                NULL|                NULL|    NULL|            NULL|       2|1165|
|    5|2013-11-10 20:31:...|    12|&lt;p&gt;Come cre...|    1|2013-11-10 19:44:...|       61|Cosa sapreste dir...| &lt;word-choice&gt;|       1|            NULL|       1|1166|
|    2|2013-11-10 20:31:...|    17|&lt;p&gt;Il verbo...|    5|2013-11-10 19:58:...|     NULL|                NULL|      

In [None]:
dfSE.sort("id").show()

In [17]:
dfSE.printSchema()

root
 |-- nComs: integer (nullable = true)
 |-- lastActivity: timestamp (nullable = true)
 |-- userId: long (nullable = true)
 |-- body: string (nullable = true)
 |-- score: integer (nullable = true)
 |-- creationDate: timestamp (nullable = true)
 |-- numViewed: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- tags: string (nullable = true)
 |-- nAnswers: integer (nullable = true)
 |-- acceptedAnswerId: long (nullable = true)
 |-- postType: byte (nullable = true)
 |-- id: long (nullable = true)



## Basic operations with DataFrames

### Show rows

In [18]:
# show(n) shows the first n rows (by default, n=20)
dfSE.show(5)

+-----+--------------------+------+--------------------+-----+--------------------+---------+--------------------+--------------------+--------+----------------+--------+----+
|nComs|        lastActivity|userId|                body|score|        creationDate|numViewed|               title|                tags|nAnswers|acceptedAnswerId|postType|  id|
+-----+--------------------+------+--------------------+-----+--------------------+---------+--------------------+--------------------+--------+----------------+--------+----+
|    4|2013-11-11 18:21:...|    17|&lt;p&gt;The infi...|   23|2013-11-10 19:37:...|     NULL|                NULL|                NULL|    NULL|            NULL|       2|1165|
|    5|2013-11-10 20:31:...|    12|&lt;p&gt;Come cre...|    1|2013-11-10 19:44:...|       61|Cosa sapreste dir...| &lt;word-choice&gt;|       1|            NULL|       1|1166|
|    2|2013-11-10 20:31:...|    17|&lt;p&gt;Il verbo...|    5|2013-11-10 19:58:...|     NULL|                NULL|      

In [19]:
# Say that we do not want to truncate the long fields
dfSE.show(5, truncate=False)

+-----+-----------------------+------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [21]:
# take(n) returns the first n rows as a Python list of Row objects
list = dfSE.take(5)
print(list[1])
print("\n")
# collect() returns the DataFrame as a Python list of Row objects
# Warning: if the DataFrame is too large, it might collapse the Driver!
list2 = dfSE.collect()
print(list2[10])

Row(nComs=5, lastActivity=datetime.datetime(2013, 11, 10, 20, 31, 0, 177000), userId=12, body="&lt;p&gt;Come credo sia conosciuto da tutti quelli che usano viaggiare con l'automobile, molti italiani hanno uno strano rapporto con gli abbaglianti; alcuni li amano così tanto che preferiscono mantenerli sempre accesi, altri invece li usano per segnalare, se non addirittura per comunicare informazioni di vario genere, dalla presenza di autovelox alla protesta per presunte violazioni del codice della strada.&lt;/p&gt;&lt;p&gt;Al di lá delle considerazioni e dei commenti circa queste abitudini, mi piacerebbe sapere se il verbo &quot;sfanagliare&quot; è normalmente usato, e compreso, in tutte le regioni italiane o se, magari, ci sono altri verbi in uso, purchè simpatici come quello.&lt;/p&gt;&lt;p&gt;Laddove qualcuno non avesse compreso l'uso del aforementioned verbo, ecco un esempio:&lt;/p&gt;&lt;blockquote&gt;  &lt;p&gt;&quot;Ehi!&quot; - dice il marito a sua moglie - &quot;Quello li mi sta 

In [22]:
import os
# sample(withReplacement, fraction, seed=None) returns a new Dataframe with a fraction of the original rows
dfSESampled = dfSE.sample(False, 0.1, seed=None)
print("Original Number of rows = {0}; Number of sampled rows = {1}".format(dfSE.count(), dfSESampled.count()))

Original Number of rows = 1261; Number of sampled rows = 117


In [23]:
# limit(n) limits the number of rows calculated to n
dfSE_10rows = dfSE.sample(False, 0.1, seed=None).limit(10)
print("Number of sampled rows = {0}".format(dfSE_10rows.count()))
dfSE_10rows.show()

Number of sampled rows = 10
+-----+--------------------+------+--------------------+-----+--------------------+---------+--------------------+--------------------+--------+----------------+--------+----+
|nComs|        lastActivity|userId|                body|score|        creationDate|numViewed|               title|                tags|nAnswers|acceptedAnswerId|postType|  id|
+-----+--------------------+------+--------------------+-----+--------------------+---------+--------------------+--------------------+--------+----------------+--------+----+
|    2|2013-11-10 20:31:...|    17|&lt;p&gt;Il verbo...|    5|2013-11-10 19:58:...|     NULL|                NULL|                NULL|    NULL|            NULL|       2|1167|
|    2|2013-11-10 22:17:...|    17|&lt;p&gt;There's ...|    8|2013-11-10 22:17:...|     NULL|                NULL|                NULL|    NULL|            NULL|       2|1170|
|    2|2013-11-11 23:23:...|   159|&lt;p&gt;Sono un'...|    7|2013-11-11 18:19:...|      138

### Execute an operation on each row
The method `foreach` applies a function to each row

- The DataFrame is not modified and no other DataFrames are created
- `foreach` is executed in the Workers

In [25]:
def printid(f):
    print(f["id"])

# In theory, this code should print all values of the 'id' column.
# Due to the way the notebook manages tasks, it is not possible to see any output.
# Run it on a pyspark-shell to see the output.
dfSE_10rows.foreach(printid)


### Select columns

In [26]:
# Creates a new DataFrame by selecting columns by name
dfIdBody = dfSE.select("id", "body")
dfIdBody.show(5)

print("The idBody object is of type {0}".format(type(dfIdBody)))

+----+--------------------+
|  id|                body|
+----+--------------------+
|1165|&lt;p&gt;The infi...|
|1166|&lt;p&gt;Come cre...|
|1167|&lt;p&gt;Il verbo...|
|1168|&lt;p&gt;As part ...|
|1169|&lt;p&gt;&lt;em&g...|
+----+--------------------+
only showing top 5 rows

The idBody object is of type <class 'pyspark.sql.dataframe.DataFrame'>


In [27]:
# Another way of specifying the columns to select
dfIdBody2 = dfSE.select(dfSE.id, dfSE.body)
dfIdBody2.show(5)

+----+--------------------+
|  id|                body|
+----+--------------------+
|1165|&lt;p&gt;The infi...|
|1166|&lt;p&gt;Come cre...|
|1167|&lt;p&gt;Il verbo...|
|1168|&lt;p&gt;As part ...|
|1169|&lt;p&gt;&lt;em&g...|
+----+--------------------+
only showing top 5 rows



In [28]:
# It is also possible to specify objects of Column type...
from pyspark.sql.functions import col

colId = col("id")
colCreateDate = col("creationDate")
print("The colId object is of type {0}".format(type(colId)))
print("The colCreateDate object is of type {0}".format(type(colCreateDate)))

The colId object is of type <class 'pyspark.sql.column.Column'>
The colCreateDate object is of type <class 'pyspark.sql.column.Column'>


In [29]:
# ... and create a DataFrame from Column objects, by renaming the columns
dfIdBodyDate = dfSE.select(colId,
                              colCreateDate.alias("Creation_date"),
                              dfSE.body.alias("Content"))
dfIdBodyDate.show(5)

+----+--------------------+--------------------+
|  id|       Creation_date|             Content|
+----+--------------------+--------------------+
|1165|2013-11-10 19:37:...|&lt;p&gt;The infi...|
|1166|2013-11-10 19:44:...|&lt;p&gt;Come cre...|
|1167|2013-11-10 19:58:...|&lt;p&gt;Il verbo...|
|1168|2013-11-10 22:03:...|&lt;p&gt;As part ...|
|1169|2013-11-10 22:15:...|&lt;p&gt;&lt;em&g...|
+----+--------------------+--------------------+
only showing top 5 rows



#### Select columns by using expressions

To select columns using SQL expressions

In [30]:
from pyspark.sql.functions import expr
# Same DataFrame as before but using expressions
dfIdDateBodyExpr = dfSE.select(
                           expr("id AS ID"),
                           expr('creationDate AS Creation_date'),
                           expr("body AS Content"))
dfIdDateBodyExpr.show(5)


+----+--------------------+--------------------+
|  ID|       Creation_date|             Content|
+----+--------------------+--------------------+
|1165|2013-11-10 19:37:...|&lt;p&gt;The infi...|
|1166|2013-11-10 19:44:...|&lt;p&gt;Come cre...|
|1167|2013-11-10 19:58:...|&lt;p&gt;Il verbo...|
|1168|2013-11-10 22:03:...|&lt;p&gt;As part ...|
|1169|2013-11-10 22:15:...|&lt;p&gt;&lt;em&g...|
+----+--------------------+--------------------+
only showing top 5 rows



In [31]:
# We can use more complex expressions
dfSE.selectExpr("*", # Select all columns and set ValidReply to True for those with, at least, one reply.
                "(nAnswers IS NOT NULL) as ValidReply").show()

+-----+--------------------+------+--------------------+-----+--------------------+---------+--------------------+--------------------+--------+----------------+--------+----+----------+
|nComs|        lastActivity|userId|                body|score|        creationDate|numViewed|               title|                tags|nAnswers|acceptedAnswerId|postType|  id|ValidReply|
+-----+--------------------+------+--------------------+-----+--------------------+---------+--------------------+--------------------+--------+----------------+--------+----+----------+
|    4|2013-11-11 18:21:...|    17|&lt;p&gt;The infi...|   23|2013-11-10 19:37:...|     NULL|                NULL|                NULL|    NULL|            NULL|       2|1165|     false|
|    5|2013-11-10 20:31:...|    12|&lt;p&gt;Come cre...|    1|2013-11-10 19:44:...|       61|Cosa sapreste dir...| &lt;word-choice&gt;|       1|            NULL|       1|1166|      true|
|    2|2013-11-10 20:31:...|    17|&lt;p&gt;Il verbo...|    5|201

### Rename, add and delete columns


In [32]:
# Rename the creationDate column
dfSE = dfSE.withColumnRenamed("creationDate", "Creation_date")
dfSE.cache()
dfSE.select("Creation_date",
            dfSE.numViewed.alias("Number_of_visits"),
            "score",
            "postType").show(truncate=False)

+-----------------------+----------------+-----+--------+
|Creation_date          |Number_of_visits|score|postType|
+-----------------------+----------------+-----+--------+
|2013-11-10 19:37:54.187|NULL            |23   |2       |
|2013-11-10 19:44:53.797|61              |1    |1       |
|2013-11-10 19:58:02.1  |NULL            |5    |2       |
|2013-11-10 22:03:41.027|187             |11   |1       |
|2013-11-10 22:15:17.693|NULL            |3    |2       |
|2013-11-10 22:17:22.38 |NULL            |8    |2       |
|2013-11-11 09:51:11.22 |NULL            |3    |2       |
|2013-11-11 10:09:05.117|NULL            |1    |2       |
|2013-11-11 10:28:12.613|122             |5    |1       |
|2013-11-11 10:58:02.62 |NULL            |5    |2       |
|2013-11-11 11:31:02.343|114             |4    |1       |
|2013-11-11 11:39:12.703|58              |3    |1       |
|2013-11-11 11:57:03.723|NULL            |6    |2       |
|2013-11-11 12:00:25.583|NULL            |1    |2       |
|2013-11-11 12

In [33]:
# Add a new column 'ones' with all its values set to 1
from pyspark.sql.functions import lit
# lit transforms a literal in Python to Spark internal format
# (in this example, IntegerType)
dfSE = dfSE.withColumn("ones", lit(1))
dfSE.show(5)

+-----+--------------------+------+--------------------+-----+--------------------+---------+--------------------+--------------------+--------+----------------+--------+----+----+
|nComs|        lastActivity|userId|                body|score|       Creation_date|numViewed|               title|                tags|nAnswers|acceptedAnswerId|postType|  id|ones|
+-----+--------------------+------+--------------------+-----+--------------------+---------+--------------------+--------------------+--------+----------------+--------+----+----+
|    4|2013-11-11 18:21:...|    17|&lt;p&gt;The infi...|   23|2013-11-10 19:37:...|     NULL|                NULL|                NULL|    NULL|            NULL|       2|1165|   1|
|    5|2013-11-10 20:31:...|    12|&lt;p&gt;Come cre...|    1|2013-11-10 19:44:...|       61|Cosa sapreste dir...| &lt;word-choice&gt;|       1|            NULL|       1|1166|   1|
|    2|2013-11-10 20:31:...|    17|&lt;p&gt;Il verbo...|    5|2013-11-10 19:58:...|     NULL|  

In [34]:
# Removes a column using drop
dfSE = dfSE.drop(col("ones"))
dfSE.columns

['nComs',
 'lastActivity',
 'userId',
 'body',
 'score',
 'Creation_date',
 'numViewed',
 'title',
 'tags',
 'nAnswers',
 'acceptedAnswerId',
 'postType',
 'id']

### Delete null and duplicated values

In [36]:
# Remove all rows that have null on any of their columns
dfNoNulls = dfSE.dropna("any")
print("Initial number or rows: {0}; number of non null rows: {1}"
       .format(dfSE.count(), dfNoNulls.count()))


Initial number or rows: 1261; number of non null rows: 222


In [37]:
# Remove rows that have null on all their columns
dfNeitherNull = dfSE.dropna("all")
print("Number of rows with all columns set to null: {0}"
       .format(dfSE.count() - dfNeitherNull.count()))

Number of rows with all columns set to null: 0


In [38]:
# Remove duplicated rows
dfWithoutDuplicates = dfSE.dropDuplicates()
print("Number of duplicated rows: {0}"
       .format(dfSE.count() - dfWithoutDuplicates.count()))

Number of duplicated rows: 0


In [39]:
# Remove rows when a given column is duplicated
dfWithoutDuplicatedUser = dfSE.dropDuplicates(["userId"])
print("Number of unique users: {0}"
       .format(dfWithoutDuplicatedUser.count()))

Number of unique users: 218


In [40]:
# Other examples
dfNoNullnumViewedAcceptedAnswerId = dfSE.dropna("any", subset=["numViewed", "acceptedAnswerId"])
print("Number of rows with numViewed AND acceptedAnswerId not null: {0}"
       .format(dfNoNullnumViewedAcceptedAnswerId.count()))

dfNoNullnumViewedAcceptedAnswerId = dfSE.dropna("all", subset=["numViewed", "acceptedAnswerId"])
print("Number of rows with numViewed OR acceptedAnswerId not null: {0}"
       .format(dfNoNullnumViewedAcceptedAnswerId.count()))

Number of rows with numViewed AND acceptedAnswerId not null: 222
Number of rows with numViewed OR acceptedAnswerId not null: 374


### Replacing values

In [41]:
# Replace with '0' all null values in the numVistas and nAnswers fields
dfSE = dfSE.fillna(0, subset=["numViewed", "nAnswers"])
dfSE.show(5)

+-----+--------------------+------+--------------------+-----+--------------------+---------+--------------------+--------------------+--------+----------------+--------+----+
|nComs|        lastActivity|userId|                body|score|       Creation_date|numViewed|               title|                tags|nAnswers|acceptedAnswerId|postType|  id|
+-----+--------------------+------+--------------------+-----+--------------------+---------+--------------------+--------------------+--------+----------------+--------+----+
|    4|2013-11-11 18:21:...|    17|&lt;p&gt;The infi...|   23|2013-11-10 19:37:...|        0|                NULL|                NULL|       0|            NULL|       2|1165|
|    5|2013-11-10 20:31:...|    12|&lt;p&gt;Come cre...|    1|2013-11-10 19:44:...|       61|Cosa sapreste dir...| &lt;word-choice&gt;|       1|            NULL|       1|1166|
|    2|2013-11-10 20:31:...|    17|&lt;p&gt;Il verbo...|    5|2013-11-10 19:58:...|        0|                NULL|      

In [42]:
# Replace the value 1170 with 3000 in columns "id" and "acceptedAnswerId"
dfSE.select("id", "acceptedAnswerId").show(10)
dfSE.replace(1170, 3000, subset=["id", "acceptedAnswerId"])\
    .select("id", "acceptedAnswerId")\
    .show(10)

+----+----------------+
|  id|acceptedAnswerId|
+----+----------------+
|1165|            NULL|
|1166|            NULL|
|1167|            NULL|
|1168|            1170|
|1169|            NULL|
|1170|            NULL|
|1171|            NULL|
|1172|            NULL|
|1173|            1181|
|1174|            NULL|
+----+----------------+
only showing top 10 rows

+----+----------------+
|  id|acceptedAnswerId|
+----+----------------+
|1165|            NULL|
|1166|            NULL|
|1167|            NULL|
|1168|            3000|
|1169|            NULL|
|3000|            NULL|
|1171|            NULL|
|1172|            NULL|
|1173|            1181|
|1174|            NULL|
+----+----------------+
only showing top 10 rows



## Saving DataFrames

As for reading, Spark can save DateFrames in multiple formats:

- CSV, JSON, Parquet, Hadoop...

It can write them as well on a database

In [43]:
# Save the dfSE DataFrame in JSON format
#dfSE.write.format("json").mode("overwrite").save("/content/dfSE.json")
dfSE.write.json(os.environ["DRIVE_DATA"] + "dfSE.json",mode="overwrite")

#!mv /content/dfSE.json "$DRIVE_DATA"

In [44]:
!ls -alh "$DRIVE_DATA"/dfSE.json

total 1.4M
-rw------- 1 root root 1.4M Nov 25 13:41 part-00000-3ad09b66-4ebc-4c66-8342-066dea7211d2-c000.json
-rw------- 1 root root  11K Nov 25 13:41 .part-00000-3ad09b66-4ebc-4c66-8342-066dea7211d2-c000.json.crc
-rw------- 1 root root    0 Nov 25 13:41 _SUCCESS
-rw------- 1 root root    8 Nov 25 13:41 ._SUCCESS.crc


In [45]:
# Save the DataFrame using Parquet
dfSE.write.format("parquet")\
    .mode("overwrite")\
    .save(os.environ["DRIVE_DATA"] + "dfSE.parquet")

In [46]:
# Parquet uses by default the Snappy compressed format
!ls -alh "$DRIVE_DATA"/dfSE.parquet

total 620K
-rw------- 1 root root 615K Nov 25 13:41 part-00000-54666160-c9a1-46c6-a342-2cc196f61a4b-c000.snappy.parquet
-rw------- 1 root root 4.9K Nov 25 13:41 .part-00000-54666160-c9a1-46c6-a342-2cc196f61a4b-c000.snappy.parquet.crc
-rw------- 1 root root    0 Nov 25 13:41 _SUCCESS
-rw------- 1 root root    8 Nov 25 13:41 ._SUCCESS.crc


It will create as many files as there are partitions in the DataFrame

In [47]:
dfSE2 = dfSE.repartition(2)
# Save the DataFrame using Parquet, with gzip compression
dfSE2.write.format("parquet")\
     .mode("overwrite")\
     .option("compression", "gzip")\
     .save(os.environ["DRIVE_DATA"] + "/dfSE2.parquet")

In [48]:
!ls -alh "$DRIVE_DATA"/dfSE2.parquet

total 419K
-rw------- 1 root root 203K Nov 25 13:42 part-00000-cb5d7ac4-7144-42a0-948e-31c9031217b1-c000.gz.parquet
-rw------- 1 root root 1.6K Nov 25 13:42 .part-00000-cb5d7ac4-7144-42a0-948e-31c9031217b1-c000.gz.parquet.crc
-rw------- 1 root root 212K Nov 25 13:42 part-00001-cb5d7ac4-7144-42a0-948e-31c9031217b1-c000.gz.parquet
-rw------- 1 root root 1.7K Nov 25 13:42 .part-00001-cb5d7ac4-7144-42a0-948e-31c9031217b1-c000.gz.parquet.crc
-rw------- 1 root root    0 Nov 25 13:42 _SUCCESS
-rw------- 1 root root    8 Nov 25 13:42 ._SUCCESS.crc


### Partitioning

Spark can partition and save a file using the value of a given column

- A directory is created for each different value in the partitioning column
    - All data associated to that value are stored in that directory
- It simplifies the access to the values associated to a given key


In [49]:
# Save our DataFrame partitioned by the userID field (using Parquet)
dfSE.write.format("parquet")\
    .mode("overwrite")\
    .partitionBy("userId")\
    .save(os.environ["DRIVE_DATA"] + "dfSE-partition.parquet")


In [50]:
#!ls -lh "$DRIVE_DATA"dfSE-partition.parquet
!ls -lh "$DRIVE_DATA"dfSE-partition.parquet/userId=10
#rm -rf "$DRIVE_DATA"dfSE-partition.parquet

total 4.5K
-rw------- 1 root root 4.4K Nov 25 13:44 part-00000-e4717893-b646-48d6-992d-e91f2aa8cf91.c000.snappy.parquet




---

# Exercises


## Exercise 3.1: Word count

Count the number of words *per line* in the $DRIVE_DATA/quijote.txt file.

Repeat the exercise but this time counting the number of words *in the whole file*.

In [None]:
from pyspark.sql import functions as F
# so that we can use the F.split() function.

In [55]:
from pyspark.sql import functions as F

# Charger le fichier texte dans un DataFrame
file_path = os.environ["DRIVE_DATA"] + "quijote.txt"
df = spark.read.text(file_path).withColumnRenamed("value", "line")

# Étape 2 : Compter les mots par ligne
df_with_word_count = df.withColumn("word_count", F.size(F.split(F.col("line"), " ")))
df_with_word_count.show(10, truncate=False)

# Étape 3 : Compter les mots dans tout le fichier
total_word_count = df_with_word_count.agg(F.sum("word_count").alias("total_word_count"))
total_word_count.show()


+---------------------------------------------------------------------------+----------+
|line                                                                       |word_count|
+---------------------------------------------------------------------------+----------+
|The Project Gutenberg EBook of Don Quijote, by Miguel de Cervantes Saavedra|12        |
|                                                                           |1         |
|This eBook is for the use of anyone anywhere at no cost and with           |14        |
|almost no restrictions whatsoever.  You may copy it, give it away or       |13        |
|re-use it under the terms of the Project Gutenberg License included        |11        |
|with this eBook or online at www.gutenberg.net                             |7         |
|                                                                           |1         |
|                                                                           |1         |
|Title: Don Quijote  