Useful resources:

https://www.youtube.com/playlist?list=PL0hSJrxggIQr6wA8buIn1Yxu810ugGed-

https://www.tutorialspoint.com/apache_spark/apache_spark_quick_guide.htm

https://spark.apache.org/docs/2.2.0/rdd-programming-guide.html

https://spark.apache.org/docs/latest/api/python/index.html

# RDD

## 1. Initializing Spark

### SparkContext

In [1]:
from pyspark import SparkContext
sc = SparkContext(master = 'local[2]')

### Inspect SparkContext

In [2]:
sc

In [None]:
sc.version #Retrieve SparkContext version
sc.pythonVer #Retrieve Python version
sc.master #Master URL to connect to
str(sc.sparkHome) #Path where Spark is installed on worker nodes
str(sc.sparkUser()) #Retrieve name of the Spark User running SparkContext
sc.appName #Return application name
sc.applicationId #Retrieve application ID
sc.defaultParallelism# Return default level of parallelism
sc.defaultMinPartitions #Default minimum number of partitions for RDDs

### Configuration

In [1]:
from pyspark import SparkConf, SparkContext
conf = (SparkConf().setMaster("local").setAppName("My app").set("spark.executor.memory", "1g"))
sc = SparkContext(conf = conf)

In [2]:
sc

## 2. Loading Data

### Parallelized Collections

In [2]:
rdd = sc.parallelize([('a',7),('a',2),('b',2)])
rdd2 = sc.parallelize([('a',2),('d',1),('b',1)])
rdd3 = sc.parallelize(range(100))
rdd4 = sc.parallelize([("a",["x","y","z"]),("b",["p", "r"])])

### External Data

Read either one text file from HDFS, a local file system, or anyHadoop-supported file system URI with `textFile()`, or read in a directory of text files with `wholeTextFiles()`.

In [None]:
textFile = sc.textFile("/my/directory/*.txt")
textFile2 = sc.wholeTextFiles("/my/directory/")

## 3. Retrieving RDD Information 

### Basic Information

In [8]:
rdd.getNumPartitions() #List the number of partitions

1

In [9]:
rdd.count() #Count RDD instances 3

3

In [10]:
rdd.countByKey() #Count RDD instances by key

defaultdict(int, {'a': 2, 'b': 1})

In [11]:
rdd.countByValue() #Count RDD instances by value

defaultdict(int, {('a', 7): 1, ('a', 2): 1, ('b', 2): 1})

In [12]:
rdd.collectAsMap() #Return (key,value) pairs as a dictionary

{'a': 2, 'b': 2}

In [13]:
rdd3.sum() #Sum of RDD elements 4950

4950

In [20]:
sc.parallelize([]).isEmpty() #Check whether RDD is empty

True

### Summary

In [None]:
rdd3.max() #Maximum value of RDD elements 99
rdd3.min() #Minimum value of RDD elements 0
rdd3.mean() #Mean value of RDD elements 49.5
rdd3.stdev() #Standard deviation of RDD elements 28.866070047722118
rdd3.variance() #Compute variance of RDD elements 833.25
rdd3.histogram(3)# Compute histogram by bins

In [21]:
rdd3.stats() #Summary

(count: 100, mean: 49.5, stdev: 28.86607004772212, max: 99.0, min: 0.0)

## 4. Applying Functions

In [3]:
rdd.map(lambda x: x+(x[1],x[0])).collect() #Apply a function to each RDD element

[('a', 7, 7, 'a'), ('a', 2, 2, 'a'), ('b', 2, 2, 'b')]

In [4]:
rdd5 = rdd.flatMap(lambda x: x+(x[1],x[0])) #Apply a function to each RDD element and flatten the result
rdd5.collect()

['a', 7, 7, 'a', 'a', 2, 2, 'a', 'b', 2, 2, 'b']

In [5]:
rdd4.flatMapValues(lambda x: x).collect() #Apply a flatMap function to each (key,value) pair of rdd4 without changing the keys

[('a', 'x'), ('a', 'y'), ('a', 'z'), ('b', 'p'), ('b', 'r')]

## 5. Selecting Data

### Getting

In [6]:
rdd.collect() #Return a list with all RDD elements

[('a', 7), ('a', 2), ('b', 2)]

In [7]:
rdd.take(2) #Take first 2 RDD elements

[('a', 7), ('a', 2)]

In [8]:
rdd.first() #Take first RDD element

('a', 7)

In [9]:
rdd.top(2) #Take top 2 RDD elements

[('b', 2), ('a', 7)]

### Sampling

In [10]:
rdd3.sample(False, 0.15, 81).collect() #Return sampled subset of rdd3

[3, 4, 27, 28, 35, 41, 43, 49, 53, 58, 85, 93]

### Filtering

In [11]:
rdd.filter(lambda x: "a" in x).collect() # Filter the RDD

[('a', 7), ('a', 2)]

In [12]:
rdd5.distinct().collect() #Return distinct RDD values

['a', 7, 2, 'b']

In [13]:
rdd.keys().collect() #Return (key,value) RDD's keys

['a', 'a', 'b']

## 6. Reshaping Data
https://spark.apache.org/docs/latest/api/python/reference/pyspark.html

### Reducing

In [14]:
rdd.reduceByKey(lambda x,y : x+y).collect() #Merge the rdd values for each key

[('a', 9), ('b', 2)]

In [15]:
rdd.reduce(lambda a, b: a + b) #Merge the rdd values

('a', 7, 'a', 2, 'b', 2)

### Grouping by

In [22]:
rdd3.groupBy(lambda x: x % 2)\
 .mapValues(list)\
 .collect()#Return RDD of grouped values

[(0,
  [0,
   2,
   4,
   6,
   8,
   10,
   12,
   14,
   16,
   18,
   20,
   22,
   24,
   26,
   28,
   30,
   32,
   34,
   36,
   38,
   40,
   42,
   44,
   46,
   48,
   50,
   52,
   54,
   56,
   58,
   60,
   62,
   64,
   66,
   68,
   70,
   72,
   74,
   76,
   78,
   80,
   82,
   84,
   86,
   88,
   90,
   92,
   94,
   96,
   98]),
 (1,
  [1,
   3,
   5,
   7,
   9,
   11,
   13,
   15,
   17,
   19,
   21,
   23,
   25,
   27,
   29,
   31,
   33,
   35,
   37,
   39,
   41,
   43,
   45,
   47,
   49,
   51,
   53,
   55,
   57,
   59,
   61,
   63,
   65,
   67,
   69,
   71,
   73,
   75,
   77,
   79,
   81,
   83,
   85,
   87,
   89,
   91,
   93,
   95,
   97,
   99])]

In [23]:
rdd.groupByKey().mapValues(list).collect() #Group rdd by key

[('a', [7, 2]), ('b', [2])]

### Aggregating

`seqFunc` runs firstly and it makes the given calculation with zeroValue and current value and then create an output value. `combFunc` uses this output value and make the given calculation with current and previous output value of seqFunc function.

In [24]:
seqOp = (lambda x,y: (x[0]+y,x[1]+1))
combOp = (lambda x,y:(x[0]+y[0],x[1]+y[1]))
rdd3.aggregate((0,0),seqOp,combOp) #Aggregate RDD elements of each partition and then the results

(4950, 100)

In [26]:
rdd.aggregateByKey((0,0),seqOp,combOp).collect() #Aggregate values of each RDD key

[('a', (9, 2)), ('b', (2, 1))]

In [28]:
from operator import add
rdd3.fold(0,add) #Aggregate the elements of each partition, and then the results

4950

In [29]:
rdd.foldByKey(0, add).collect() #Merge the values for each key

[('a', 9), ('b', 2)]

In [30]:
rdd3.keyBy(lambda x: x+x).collect() #Create tuples of RDD elements by applying a function

[(0, 0),
 (2, 1),
 (4, 2),
 (6, 3),
 (8, 4),
 (10, 5),
 (12, 6),
 (14, 7),
 (16, 8),
 (18, 9),
 (20, 10),
 (22, 11),
 (24, 12),
 (26, 13),
 (28, 14),
 (30, 15),
 (32, 16),
 (34, 17),
 (36, 18),
 (38, 19),
 (40, 20),
 (42, 21),
 (44, 22),
 (46, 23),
 (48, 24),
 (50, 25),
 (52, 26),
 (54, 27),
 (56, 28),
 (58, 29),
 (60, 30),
 (62, 31),
 (64, 32),
 (66, 33),
 (68, 34),
 (70, 35),
 (72, 36),
 (74, 37),
 (76, 38),
 (78, 39),
 (80, 40),
 (82, 41),
 (84, 42),
 (86, 43),
 (88, 44),
 (90, 45),
 (92, 46),
 (94, 47),
 (96, 48),
 (98, 49),
 (100, 50),
 (102, 51),
 (104, 52),
 (106, 53),
 (108, 54),
 (110, 55),
 (112, 56),
 (114, 57),
 (116, 58),
 (118, 59),
 (120, 60),
 (122, 61),
 (124, 62),
 (126, 63),
 (128, 64),
 (130, 65),
 (132, 66),
 (134, 67),
 (136, 68),
 (138, 69),
 (140, 70),
 (142, 71),
 (144, 72),
 (146, 73),
 (148, 74),
 (150, 75),
 (152, 76),
 (154, 77),
 (156, 78),
 (158, 79),
 (160, 80),
 (162, 81),
 (164, 82),
 (166, 83),
 (168, 84),
 (170, 85),
 (172, 86),
 (174, 87),
 (176, 88

## 7. Mathematical Operations

In [32]:
rdd.subtract(rdd2).collect() #Return each rdd value not contained in rdd2

[('a', 7), ('b', 2)]

In [33]:
rdd2.subtractByKey(rdd).collect() #Return each (key,value) pair of rdd2 with no matching key in rdd [('d', 1)]

[('d', 1)]

In [34]:
rdd.cartesian(rdd2).collect() #Return the Cartesian product of rdd and rdd2

[(('a', 7), ('a', 2)),
 (('a', 7), ('d', 1)),
 (('a', 7), ('b', 1)),
 (('a', 2), ('a', 2)),
 (('a', 2), ('d', 1)),
 (('a', 2), ('b', 1)),
 (('b', 2), ('a', 2)),
 (('b', 2), ('d', 1)),
 (('b', 2), ('b', 1))]

## 8. Others

### Iterating

In [44]:
def g(x):
    print(x)
    
rdd.foreach(g) #Apply a function to all RDD elements

### Sort

In [46]:
rdd2.sortBy(lambda x: x[1]).collect() #Sort RDD by given function 

[('d', 1), ('b', 1), ('a', 2)]

In [47]:
rdd2.sortByKey().collect() #Sort (key, value) RDD by key

[('a', 2), ('b', 1), ('d', 1)]

### Repartitioning

In [None]:
rdd.repartition(4) #New RDD with 4 partitions
rdd.coalesce(1) #Decrease the number of partitions in the RDD to 1

### Saving

In [None]:
rdd.saveAsTextFile("rdd.txt")

rdd.saveAsHadoopFile("hdfs://namenodehost/parent/child", 'org.apache.hadoop.mapred.TextOutputFormat')

In [None]:
#Stopping SparkContext
sc.stop()

In [None]:
# Execution
'''
$ ./bin/spark-submit examples/src/main/python/pi.py
'''

# DataFrame & SQL

## 1. Initializing SparkSession

A SparkSession can be used create DataFrame, register DataFrame as tables, execute SQL over tables, cache tables, and read parquet files.

In [1]:
from pyspark.sql import SparkSession

spark = SparkSession \
 .builder \
 .appName("Python Spark SQL basic example") \
 .config("spark.some.config.option", "some-value") \
 .getOrCreate()

In [2]:
spark

## 2. Creating DataFrames

### From RDDs

In [None]:
from pyspark.sql.types import *

# Infer Schema
sc = spark.sparkContext
lines = sc.textFile("people.txt")
parts = lines.map(lambda l: l.split(","))
people = parts.map(lambda p: Row(name=p[0],age=int(p[1])))
peopledf = spark.createDataFrame(people)

# Specify Schema
people = parts.map(lambda p: Row(name=p[0],age=int(p[1].strip())))
schemaString = "name age"
fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()]
schema = StructType(fields)
spark.createDataFrame(people, schema).show()

### From Spark Data Sources

In [None]:
# JSON
df = spark.read.json("customer.json")
df.show()
 +--------------------+---+---------+--------+--------------------+
 | address|age|firstName |lastName| phoneNumber|
 +--------------------+---+---------+--------+--------------------+
 |[New York,10021,N...| 25| John| Smith|[[212 555-1234,ho...|
 |[New York,10021,N...| 21| Jane| Doe|[[322 888-1234,ho...|
 +--------------------+---+---------+--------+--------------------+
df2 = spark.read.load("people.json", format="json")
                                        
# Parquet files
df3 = spark.read.load("users.parquet")
# TXT files
df4 = spark.read.text("people.txt")

## 3. Inspect Data

In [None]:
df.dtypes #Return df column names and data types
df.show() #Display the content of df
df.head() #Return first n rows
df.first() #Return first row
df.take(2) #Return the first n rows >>> df.schema Return the schema of df

In [None]:
df.describe().show()# Compute summary statistics
df.columns #Return the columns of df
df.count() #Count the number of rows in df
df.distinct().count() #Count the number of distinct rows in df
df.printSchema() #Print the schema of df
df.explain() #Print the (logical and physical) plans

In [None]:
df = df.dropDuplicates() #Drop duplicates

## 4. Queries

In [None]:
from pyspark.sql import functions as F

### Select

In [None]:
df.select("firstName").show() #Show all entries in firstName column

df.select("firstName","lastName") \
 .show()

df.select("firstName", "age",explode("phoneNumber").alias("contactInfo"))\
 .select("contactInfo.type", "firstName", "age") \
 .show() # Show all entries in firstName, age and type


df.select(df["firstName"],df["age"]+ 1).show() #Show all entries in firstName and age, add 1 to the entries of age


df.select(df['age'] > 24).show() #Show all entries where age >24

### When

In [None]:
df.select("firstName", F.when(df.age > 30, 1).otherwise(0)).show() #Show firstName and 0 or 1 depending on age >30

df[df.firstName.isin("Jane","Boris")].collect() #Show firstName if in the given options

### Like

In [None]:
df.select("firstName", df.lastName.like("Smith").show() #Show firstName, and lastName is TRUE if lastName is like Smith

### Startswith - Endswith

In [None]:
df.select("firstName",df.lastName.startswith("Sm")).show() #Show firstName, and TRUE if lastName starts with Sm

df.select(df.lastName.endswith("th")).show() # Show last names ending in th

### Substring

In [None]:
df.select(df.firstName.substr(1, 3).alias("name")).collect() #Return substrings of firstName

### Between

In [None]:
df.select(df.age.between(22, 24)).show() #Show age: values are TRUE if between  22 and 24

## 5. Add, Update & Remove Columns

In [None]:
# Add
df = df.withColumn('city',df.address.city) \
 .withColumn('postalCode',df.address.postalCode) \
 .withColumn('state',df.address.state) \
 .withColumn('streetAddress',df.address.streetAddress) \
 .withColumn('telePhoneNumber', explode(df.phoneNumber.number)) \
 .withColumn('telePhoneType', explode(df.phoneNumber.type))

In [None]:
# Update
df = df.withColumnRenamed('telePhoneNumber', 'phoneNumber')

In [None]:
# Remove
df = df.drop("address", "phoneNumber")
df = df.drop(df.address).drop(df.phoneNumber)

## 6. Others

### GroupBy

In [None]:
df.groupBy("age".count().show() #Group by age, count the members n the groups

### Filter

In [None]:
df.filter(df["age"]>24).show() #Filter entries of age, only keep those records of which the values are >24

### Sort

In [None]:
peopledf.sort(peopledf.age.desc()).collect()

df.sort("age", ascending=False).collect()

df.orderBy(["age","city"],ascending=[0,1]).collect()

### Missing & Replacing Values

In [None]:
df.na.fill(50).show() #Replace null values

df.na.drop().show() #Return new df omitting rows with null values

df.na.replace(10, 20).show() \ #Return new df replacing one value with another

### Repartitioning

In [None]:
df.repartition(10).rdd.getNumPartitions() # df with 10 partitions

df.coalesce(1).rdd.getNumPartitions() #df with 1 partition

## 7. Running SQL Queries Programmatically

In [None]:
# Registering DataFrames as Views
peopledf.createGlobalTempView("people")

df.createTempView("customer")

df.createOrReplaceTempView("customer")

In [None]:
# Query Views
df5 = spark.sql("SELECT * FROM customer").show()
peopledf2 = spark.sql("SELECT * FROM global_temp.people").show()

## 8. Output

In [None]:
# Data Structures

rdd1 = df.rdd #Convert df into an RDD

df.toJSON().first() #Convert df into a RDD of string

df.toPandas()# Return the contents of df as Pandas DataFrame

In [None]:
# Write & Save to Files
df.select("firstName", "city")\
 .write \
 .save("nameAndCity.parquet")

df.select("firstName", "age") \
 .write \
 .save("namesAndAges.json",format="json")

In [None]:
# Stopping SparkSession
spark.stop()