# Introduction to Spark and Python

Let's learn how to use Spark with Python by using the pyspark library! Make sure to view the video lecture explaining Spark and RDDs before continuing on with this code.

This notebook will serve as reference code for the Big Data section of the course involving Amazon Web Services. The video will provide fuller explanations for what the code is doing.

## Creating a SparkContext

First we need to create a SparkContext. We will import this from pyspark:

In [1]:
def _build_spark_session(
    app_name, driver_cores, driver_mem, max_executors, executor_cores,
    executor_mem, queue
):
    """Build Spark session."""
    return (
        SparkSession.builder
        .appName(app_name)
        .config("spark.master", "yarn")
        .config("spark.submit.deployMode", "client")
        .config("spark.driver.cores", driver_cores)
        .config("spark.driver.memory", driver_mem)
        .config("spark.executor.cores", executor_cores)
        .config("spark.executor.memory", executor_mem)
        .config("spark.shuffle.service.enabled", True)
        .config("spark.dynamicAllocation.enabled", True)
        .config("spark.dynamicAllocation.minExecutors", 0)
        .config("spark.dynamicAllocation.maxExecutors", max_executors)
        .config("spark.executor.memoryOverhead", 2048)
        .config("spark.driver.memoryOverhead", 1024)
        .config("spark.yarn.queue", queue)
        # .config("spark.sql.session.timeZone", "UTC")
        .config("spark.driver.extraClassPath", "/soft/ora1210/db/jdbc/lib/ojdbc6.jar")
        .config("spark.executor.extraClassPath", "/soft/ora1210/db/jdbc/lib/ojdbc6.jar")
        .getOrCreate()
    )


In [2]:
!pip install pyspark



In [3]:
from pyspark.sql import SparkSession


In [32]:
spark_session = SparkSession.builder\
        .appName("app_name") \
        .getOrCreate()

In [5]:
spark_context = spark_session.sparkContext

Now create the SparkContext,A SparkContext represents the connection to a Spark cluster, and can be used to create an RDD and broadcast variables on that cluster.

*Note! You can only have one SparkContext at a time the way we are running things here.*

## Basic Operations

We're going to start with a 'hello world' example, which is just reading a text file. First let's create a text file.
___

Let's write an example text file to read, we'll use some special jupyter notebook commands for this, but feel free to use any .txt file:

In [6]:
%%writefile example.txt
first line
second line
third line
fourth line

Writing example.txt


### Creating the RDD

Now we can take in the textfile using the **textFile** method off of the SparkContext we created. This method will read a text file from HDFS, a local file system (available on all
nodes), or any Hadoop-supported file system URI, and return it as an RDD of Strings.

In [None]:
from pyspark import SparkConf, SparkContext

In [None]:
spark_context.stop()

In [7]:
textFile = spark_context.textFile('example.txt')

Spark’s primary abstraction is a distributed collection of items called a Resilient Distributed Dataset (RDD). RDDs can be created from Hadoop InputFormats (such as HDFS files) or by transforming other RDDs.

### Actions

We have just created an RDD using the textFile method and can perform operations on this object, such as counting the rows.

RDDs have actions, which return values, and transformations, which return pointers to new RDDs. Let’s start with a few actions:

In [8]:
textFile.count()

4

In [9]:
textFile.first()

'first line'

### Transformations

Now we can use transformations, for example the filter transformation will return a new RDD with a subset of items in the file. Let's create a sample transformation using the filter() method. This method (just like Python's own filter function) will only return elements that satisfy the condition. Let's try looking for lines that contain the word 'second'. In which case, there should only be one line that has that.

In [10]:
secfind = textFile.filter(lambda line: 'second' in line)

In [11]:
# RDD
secfind

PythonRDD[4] at RDD at PythonRDD.scala:53

In [12]:
# Perform action on transformation
secfind.collect()

['second line']

In [13]:
# Perform action on transformation
secfind.count()

1

In [14]:
from pyspark.sql import functions as f

In [33]:
from pyspark.sql import Row
import datetime

l = [(datetime.date(2018,1,3), 'Ankit',25, 'F'),
     (datetime.date(2018,2,3), 'Jalfaizy',22, 'M'),
     (datetime.date(2018,1,5), 'saurabh',20, 'F'),
     (datetime.date(2018,1,12), 'Bala',26, 'F'),
     (datetime.date(2018,7,9), 'Jules',19, 'M') ,
     (datetime.date(2018,3,18), 'Arild',43, 'M'),
     (datetime.date(2018,1,5), 'sarah',20, 'F'),
     (datetime.date(2018,8,12), 'Boly',33, 'M'),
     (datetime.date(2018,4,6), 'Anita',35, 'F'),
     (datetime.date(2018,12,6), 'Jules',22, 'M'),
     (datetime.date(2018,7,24), 'Soul',20, 'M'),
     (datetime.date(2018,6,17), 'Gral',54, 'F'),
     (datetime.date(2018,9,7), 'Apoh',18, 'M'),
     (datetime.date(2018,10,4), 'Dony',32, 'M'),
     (datetime.date(2018,2,5), 'Tanoh',31, 'M'),
     (datetime.date(2018,11,12), 'Issouf',27, 'M'),
     (datetime.date(2018,10,3), 'Bilé',29, 'F'),
     (datetime.date(2018,5,3), 'Gagnon',20, 'M'),
     (datetime.date(2018,3,5), 'Papiss',28, 'F'),
     (datetime.date(2018,2,12), 'Kravitz',34, 'F'),
     (datetime.date(2018,5,9), 'Mouli',35, 'F'),
     (datetime.date(2018,8,3), 'Jacques',27, 'M'),
     (datetime.date(2018,12,5), 'soum',22, 'M'),
     (datetime.date(2018,4,12), 'MBra',36, 'F')]

rdd = spark_session.sparkContext.parallelize(l)
people = rdd.map(lambda x: Row(date=x[0], name=x[1], age=int(x[2]), sexe=x[3]))
schemaPeople = spark_session.createDataFrame(people)


In [34]:
schemaPeople.groupby("sexe").agg(f.count("*").alias('nb_sexe')).toPandas()

Unnamed: 0,sexe,nb_sexe
0,F,11
1,M,13


In [20]:
#Nombre de persont don l'age est suppérieur à 3O
schemaPeople.filter(schemaPeople.age > 30).count()

9

In [21]:
schemaPeople.filter(schemaPeople.age > 30).show() #toPandas

+----------+-------+---+----+
|      date|   name|age|sexe|
+----------+-------+---+----+
|2018-03-18|  Arild| 43|   M|
|2018-08-12|   Boly| 33|   M|
|2018-04-06|  Anita| 35|   F|
|2018-06-17|   Gral| 54|   F|
|2018-10-04|   Dony| 32|   M|
|2018-02-05|  Tanoh| 31|   M|
|2018-02-12|Kravitz| 34|   F|
|2018-05-09|  Mouli| 35|   F|
|2018-04-12|   MBra| 36|   F|
+----------+-------+---+----+



In [35]:
#Nombre de personne née apres le 2018-03-1, de sexe feminin et d'age suppérieur a 40
schemaPeople.filter((schemaPeople.date > datetime.date(2018,3,1)) & (schemaPeople.sexe == 'F') & (schemaPeople.age < 40)).count()

5

In [36]:
#personne de d'age feminin et d'age inférieur à 25
schemaPeople.filter((schemaPeople.sexe == 'F') & (schemaPeople.age < 25)).count()

2

In [37]:
#creer une colone categorie qui prend jeune si l'age est inférieur à 25 et vieux sinon
schemaPeople= schemaPeople.withColumn("categorie", f.when(schemaPeople.age < 25, "jeune").otherwise("vieux"))

Unnamed: 0,date,name,age,sexe,categorie
0,2018-01-03,Ankit,25,F,vieux
1,2018-02-03,Jalfaizy,22,M,jeune
2,2018-01-05,saurabh,20,F,jeune
3,2018-01-12,Bala,26,F,vieux
4,2018-07-09,Jules,19,M,jeune
5,2018-03-18,Arild,43,M,vieux
6,2018-01-05,sarah,20,F,jeune
7,2018-08-12,Boly,33,M,vieux
8,2018-04-06,Anita,35,F,vieux
9,2018-12-06,Jules,22,M,jeune


In [38]:
import os
import datetime

In [39]:
current_date = datetime.date.today()
filepath = "data_repository"
filename = os.path.join(filepath, str(current_date.year),
                        str(current_date.month).zfill(2), str(current_date.day).zfill(2))
schemaPeople.coalesce(1).write.parquet(filename, "overwrite")

In [None]:
#sauvegarder le dataframe correspondant à la categorie jeune dans un fichier csv dans le paquet
schemaPeople.filter(schemaPeople.categorie == 'jeune').toPandas().to_csv("data_repository/jeune.csv")

Notice how the transformations won't display an output and won't be run until an action is called. In the next lecture: Advanced Spark and Python we will begin to see many more examples of this transformation and action relationship!

# Great Job!