#### always add the following cell to the start of a notebook when using spark

In [1]:
# lets start the spark session
# the entry point for an spark app is the SparkSession
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[2]").appName("FirstApp").getOrCreate()

In [2]:
# if you don't get an output here it means that jupyter isn't connected to pyspark
spark

#### use this to debug any errors related to wrong path/file not found

In [3]:
import os
os.getcwd()
# os.path.abspath(os.getcwd())

'/home/ldobre/das_2021/lesson3'

# Dataframes

we can create a dataframe from a list that we parallelize

In [4]:
data = [
    ('1', 'JS', 179),
    ('2', 'CL', 175),
    ('3', 'AS', 140),
    ('4', 'LF', 170)
]
df = spark.createDataFrame(
        data, 
        ['Id', 'Name', 'Height']  # column list
    ) 

In [5]:
df.printSchema()

root
 |-- Id: string (nullable = true)
 |-- Name: string (nullable = true)
 |-- Height: long (nullable = true)



In [6]:
df.show(10)  # default 20 rows

+---+----+------+
| Id|Name|Height|
+---+----+------+
|  1|  JS|   179|
|  2|  CL|   175|
|  3|  AS|   140|
|  4|  LF|   170|
+---+----+------+



In [7]:
# we can retrieve a subset of the df using head
df.head(2)

[Row(Id='1', Name='JS', Height=179), Row(Id='2', Name='CL', Height=175)]

In [8]:
type(df.head(2))

list

In [9]:
df.head(2)[0][2]

179

In [10]:
# we can also pass the schema 
from pyspark.sql.types import *

In [11]:
schema = StructType([
    # StructField("column_name", columnType(), Nullable),
    StructField("id", StringType(), False),
    StructField("name", StringType(), True),
    StructField("height", IntegerType(), False)
])

In [12]:
df = spark.createDataFrame(data=data, schema=schema)
df.printSchema()

root
 |-- id: string (nullable = false)
 |-- name: string (nullable = true)
 |-- height: integer (nullable = false)



## SPARK.READ

usually we want to create a df from a data source.
Spark can read from the following sources

## CSV
spark.read.csv

usefull when reading from delimited files

In [13]:
csv_path = '../data/airports.text'

In [14]:
df = spark.read.csv(
    csv_path,
    # header=True,
    inferSchema=True  # affects performance as data as parsed a second time to inferSchema
)

In [15]:
df.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: string (nullable = true)
 |-- _c3: string (nullable = true)
 |-- _c4: string (nullable = true)
 |-- _c5: string (nullable = true)
 |-- _c6: double (nullable = true)
 |-- _c7: double (nullable = true)
 |-- _c8: integer (nullable = true)
 |-- _c9: double (nullable = true)
 |-- _c10: string (nullable = true)
 |-- _c11: string (nullable = true)



In [16]:
# describe() can be used to glance over the data statics
df.describe().show()

+-------+------------------+--------------+--------------+-----------+----+----+-----------------+-------------------+------------------+------------------+----+--------------+
|summary|               _c0|           _c1|           _c2|        _c3| _c4| _c5|              _c6|                _c7|               _c8|               _c9|_c10|          _c11|
+-------+------------------+--------------+--------------+-----------+----+----+-----------------+-------------------+------------------+------------------+----+--------------+
|  count|              8107|          8107|          8107|       8107|5880|8044|             8107|               8107|              8107|              8107|8107|          8107|
|   mean|4766.3610460096215|           NaN|           NaN|       null| NaN|null|26.81772048414372|-3.9219686495380888| 933.4493647465154|0.1692364623165166|null|          null|
| stddev| 2943.205192743097|           NaN|           NaN|       null| NaN|null|27.86695318132571|  85.900872757107

In [17]:
df.show()

+---+--------------------+--------------+----------------+---+----+---------+----------+----+----+----+--------------------+
|_c0|                 _c1|           _c2|             _c3|_c4| _c5|      _c6|       _c7| _c8| _c9|_c10|                _c11|
+---+--------------------+--------------+----------------+---+----+---------+----------+----+----+----+--------------------+
|  1|              Goroka|        Goroka|Papua New Guinea|GKA|AYGA|-6.081689|145.391881|5282|10.0|   U|Pacific/Port_Moresby|
|  2|              Madang|        Madang|Papua New Guinea|MAG|AYMD|-5.207083|  145.7887|  20|10.0|   U|Pacific/Port_Moresby|
|  3|         Mount Hagen|   Mount Hagen|Papua New Guinea|HGU|AYMH|-5.826789|144.295861|5388|10.0|   U|Pacific/Port_Moresby|
|  4|              Nadzab|        Nadzab|Papua New Guinea|LAE|AYNZ|-6.569828|146.726242| 239|10.0|   U|Pacific/Port_Moresby|
|  5|Port Moresby Jack...|  Port Moresby|Papua New Guinea|POM|AYPY|-9.443383| 147.22005| 146|10.0|   U|Pacific/Port_Moresby|


using the output from the previous 2 cells, build a schema and pass it at read

In [18]:
csv_schema = StructType([
    # StructField("column_name", columnType(), Nullable),
    # edit this and add the columns
])

In [19]:
df = spark.read.csv(csv_path, schema=csv_schema)

In [20]:
df.show()

++
||
++
||
||
||
||
||
||
||
||
||
||
||
||
||
||
||
||
||
||
||
||
++
only showing top 20 rows



## TEXT
spark.read.text

similar to spark.Context.textFile

In [21]:
text_path = '../data/word_count.text'

In [22]:
df = spark.read.text(text_path)

In [23]:
df.show()

+--------------------+
|               value|
+--------------------+
|Liverpool Footbal...|
|                    |
|Founded in 1892, ...|
|                    |
|One of the most w...|
|                    |
|The club's suppor...|
|                    |
|Liverpool F.C. wa...|
|                    |
|Liverpool played ...|
|                    |
|Liverpool reached...|
|Statue of a man w...|
|Statue of Bill Sh...|
|                    |
|The club was prom...|
|                    |
|Paisley retired i...|
|3 burgundy tablet...|
+--------------------+
only showing top 20 rows



In [24]:
help(spark.read.text)

Help on method text in module pyspark.sql.readwriter:

text(paths, wholetext=False, lineSep=None, pathGlobFilter=None, recursiveFileLookup=None) method of pyspark.sql.readwriter.DataFrameReader instance
    Loads text files and returns a :class:`DataFrame` whose schema starts with a
    string column named "value", and followed by partitioned columns if there
    are any.
    The text files must be encoded as UTF-8.
    
    By default, each line in the text file is a new row in the resulting DataFrame.
    
    :param paths: string, or list of strings, for input path(s).
    :param wholetext: if true, read each file from input path(s) as a single row.
    :param lineSep: defines the line separator that should be used for parsing. If None is
                    set, it covers all ``\r``, ``\r\n`` and ``\n``.
    :param pathGlobFilter: an optional glob pattern to only include files with paths matching
                           the pattern. The syntax follows `org.apache.hadoop.fs.GlobF

## JSON
spark.read.json

In [25]:
json_path = '../data/resource_hvrh-b6nb.json'

In [26]:
df = spark.read.json(json_path)

In [27]:
df.printSchema()

root
 |-- dropoff_latitude: string (nullable = true)
 |-- dropoff_longitude: string (nullable = true)
 |-- extra: string (nullable = true)
 |-- fare_amount: string (nullable = true)
 |-- improvement_surcharge: string (nullable = true)
 |-- lpep_dropoff_datetime: string (nullable = true)
 |-- lpep_pickup_datetime: string (nullable = true)
 |-- mta_tax: string (nullable = true)
 |-- passenger_count: string (nullable = true)
 |-- payment_type: string (nullable = true)
 |-- pickup_latitude: string (nullable = true)
 |-- pickup_longitude: string (nullable = true)
 |-- ratecodeid: string (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- tip_amount: string (nullable = true)
 |-- tolls_amount: string (nullable = true)
 |-- total_amount: string (nullable = true)
 |-- trip_distance: string (nullable = true)
 |-- trip_type: string (nullable = true)
 |-- vendorid: string (nullable = true)



as long as they have a valid schema the json can be different

In [28]:
jsonStrings  = ['{"uploadTimeStamp":"1500618037189","ID":"123ID","data":[{"Data":{"unit":"rpm","value":"0"},"EventID":"E1","Timestamp":1500618037189,"pii":{}},{"Data":{"heading":"N","loc1":"false","loc2":"13.022425","loc3":"77.760587","loc4":"false","speed":"10"},"EventID":"E2","Timestamp":1500618037189,"pii":{}},{"Data":{"x":"1.1","y":"1.2","z":"2.2"},"EventID":"E3","Timestamp":1500618037189,"pii":{}},{"EventID":"E4","Data":{"value":"50","unit":"percentage"},"Timestamp":1500618037189},{"Data":{"unit":"kmph","value":"60"},"EventID":"E5","Timestamp":1500618037189,"pii":{}}]}',
 '{"uploadTimeStamp":"1500618045735","ID":"123ID","data":[{"Data":{"unit":"rpm","value":"0"},"EventID":"E1","Timestamp":1500618045735,"pii":{}},{"Data":{"heading":"N","loc1":"false","loc2":"13.022425","loc3":"77.760587","loc4":"false","speed":"10"},"EventID":"E2","Timestamp":1500618045735,"pii":{}},{"Data":{"x":"1.1","y":"1.2","z":"2.2"},"EventID":"E3","Timestamp":1500618045735,"pii":{}},{"EventID":"E4","Data":{"value":"50","unit":"percentage"},"Timestamp":1500618045735},{"Data":{"unit":"kmph","value":"60"},"EventID":"E5","Timestamp":1500618045735,"pii":{}}]}',
 '{"REGULAR_DUMMY":"REGULAR_DUMMY", "ID":"123ID", "uploadTimeStamp":1500546893837}',
 '{"REGULAR_DUMMY":"text_of_json_per_item_in_list"}'
]

In [29]:
jsonRDD = spark.sparkContext.parallelize(jsonStrings)
df = spark.read.json(jsonRDD)
df.show()

+-----+--------------------+--------------------+---------------+
|   ID|       REGULAR_DUMMY|                data|uploadTimeStamp|
+-----+--------------------+--------------------+---------------+
|123ID|                null|[[[,,,,,, rpm, 0,...|  1500618037189|
|123ID|                null|[[[,,,,,, rpm, 0,...|  1500618045735|
|123ID|       REGULAR_DUMMY|                null|  1500546893837|
| null|text_of_json_per_...|                null|           null|
+-----+--------------------+--------------------+---------------+



In [30]:
# the schema of the json is merged
df.printSchema()

root
 |-- ID: string (nullable = true)
 |-- REGULAR_DUMMY: string (nullable = true)
 |-- data: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- Data: struct (nullable = true)
 |    |    |    |-- heading: string (nullable = true)
 |    |    |    |-- loc1: string (nullable = true)
 |    |    |    |-- loc2: string (nullable = true)
 |    |    |    |-- loc3: string (nullable = true)
 |    |    |    |-- loc4: string (nullable = true)
 |    |    |    |-- speed: string (nullable = true)
 |    |    |    |-- unit: string (nullable = true)
 |    |    |    |-- value: string (nullable = true)
 |    |    |    |-- x: string (nullable = true)
 |    |    |    |-- y: string (nullable = true)
 |    |    |    |-- z: string (nullable = true)
 |    |    |-- EventID: string (nullable = true)
 |    |    |-- Timestamp: long (nullable = true)
 |-- uploadTimeStamp: string (nullable = true)



In [31]:
# Starting with Spark 2.2 you can read a multiline json
# ideally you want to receive the json on a single line

In [32]:
m_json = '../data/multiline.json'

In [34]:
spark.read.json(m_json).printSchema()

root
 |-- _corrupt_record: string (nullable = true)



In [35]:
spark.read.json(m_json).show()

AnalysisException: Since Spark 2.3, the queries from raw JSON/CSV files are disallowed when the
referenced columns only include the internal corrupt record column
(named _corrupt_record by default). For example:
spark.read.schema(schema).json(file).filter($"_corrupt_record".isNotNull).count()
and spark.read.schema(schema).json(file).select("_corrupt_record").show().
Instead, you can cache or save the parsed results and then send the same query.
For example, val df = spark.read.schema(schema).json(file).cache() and then
df.filter($"_corrupt_record".isNotNull).count().;

In [36]:
spark.read.json(m_json, multiLine=True).printSchema()

root
 |-- age: long (nullable = true)
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)



In [38]:
df = df.filter(df['data'].isNotNull()).drop('REGULAR_DUMMY')

In [39]:
df.select('data').show(20, False)  # why did I used False here?!

+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|data                                                                                                                                                                                                                                                       |
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|[[[,,,,,, rpm, 0,,,], E1, 1500618037189], [[N, false, 13.022425, 77.760587, false, 10,,,,,], E2, 1500618037189], [[,,,,,,,, 1.1, 1.2, 2.2], E3, 1500618037189], [[,,,,,, percentage, 50,,,], E4, 1500618037189], [[,,,,,, kmph, 60,,,], E5, 1

In [40]:
df.select('data').printSchema()

root
 |-- data: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- Data: struct (nullable = true)
 |    |    |    |-- heading: string (nullable = true)
 |    |    |    |-- loc1: string (nullable = true)
 |    |    |    |-- loc2: string (nullable = true)
 |    |    |    |-- loc3: string (nullable = true)
 |    |    |    |-- loc4: string (nullable = true)
 |    |    |    |-- speed: string (nullable = true)
 |    |    |    |-- unit: string (nullable = true)
 |    |    |    |-- value: string (nullable = true)
 |    |    |    |-- x: string (nullable = true)
 |    |    |    |-- y: string (nullable = true)
 |    |    |    |-- z: string (nullable = true)
 |    |    |-- EventID: string (nullable = true)
 |    |    |-- Timestamp: long (nullable = true)



In [41]:
df.select('data.Data.speed').show(20, False)

+---------+
|speed    |
+---------+
|[, 10,,,]|
|[, 10,,,]|
+---------+



In [42]:
df.select('data.Data.speed').printSchema()

root
 |-- speed: array (nullable = true)
 |    |-- element: string (containsNull = true)



In [43]:
# exploding nested jsons fields is a "hard" problem in spark
from pyspark.sql.functions import explode, arrays_zip
df.select(explode(arrays_zip('data'))).show(20, False)

+-----------------------------------------------------------------------+
|col                                                                    |
+-----------------------------------------------------------------------+
|[[[,,,,,, rpm, 0,,,], E1, 1500618037189]]                              |
|[[[N, false, 13.022425, 77.760587, false, 10,,,,,], E2, 1500618037189]]|
|[[[,,,,,,,, 1.1, 1.2, 2.2], E3, 1500618037189]]                        |
|[[[,,,,,, percentage, 50,,,], E4, 1500618037189]]                      |
|[[[,,,,,, kmph, 60,,,], E5, 1500618037189]]                            |
|[[[,,,,,, rpm, 0,,,], E1, 1500618045735]]                              |
|[[[N, false, 13.022425, 77.760587, false, 10,,,,,], E2, 1500618045735]]|
|[[[,,,,,,,, 1.1, 1.2, 2.2], E3, 1500618045735]]                        |
|[[[,,,,,, percentage, 50,,,], E4, 1500618045735]]                      |
|[[[,,,,,, kmph, 60,,,], E5, 1500618045735]]                            |
+-------------------------------------

In [44]:
df.select(explode(arrays_zip('data'))).printSchema()

root
 |-- col: struct (nullable = false)
 |    |-- data: struct (nullable = true)
 |    |    |-- Data: struct (nullable = true)
 |    |    |    |-- heading: string (nullable = true)
 |    |    |    |-- loc1: string (nullable = true)
 |    |    |    |-- loc2: string (nullable = true)
 |    |    |    |-- loc3: string (nullable = true)
 |    |    |    |-- loc4: string (nullable = true)
 |    |    |    |-- speed: string (nullable = true)
 |    |    |    |-- unit: string (nullable = true)
 |    |    |    |-- value: string (nullable = true)
 |    |    |    |-- x: string (nullable = true)
 |    |    |    |-- y: string (nullable = true)
 |    |    |    |-- z: string (nullable = true)
 |    |    |-- EventID: string (nullable = true)
 |    |    |-- Timestamp: long (nullable = true)



## JDBC
spark.read.jdbc

depending on the number of partitions, the db will receive multiple connections. This might make the db unresponsive.

used less in big projects

the code below is just an example. read the following article for more details about jdbc reads
https://github.com/awesome-spark/spark-gotchas/blob/master/05_spark_sql_and_dataset_api.md#reading-data-using-jdbc-source

In [45]:
jdbcDF = spark.read \
    .format("jdbc") \
    .option("url", "jdbc:postgresql:dbserver") \
    .option("dbtable", "schema.tablename") \
    .option("user", "username") \
    .option("password", "password") \
    .load()

jdbcDF2 = spark.read \
    .jdbc("jdbc:postgresql:dbserver", "schema.tablename",
          properties={"user": "username", "password": "password"})

# Specifying dataframe column data types on read
jdbcDF3 = spark.read \
    .format("jdbc") \
    .option("url", "jdbc:postgresql:dbserver") \
    .option("dbtable", "schema.tablename") \
    .option("user", "username") \
    .option("password", "password") \
    .option("customSchema", "id DECIMAL(38, 0), name STRING") \
    .load()

Py4JJavaError: An error occurred while calling o184.load.
: java.sql.SQLException: No suitable driver
	at java.sql/java.sql.DriverManager.getDriver(DriverManager.java:298)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.$anonfun$driverClass$2(JDBCOptions.scala:105)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.<init>(JDBCOptions.scala:105)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.<init>(JDBCOptions.scala:35)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:32)
	at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:344)
	at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:297)
	at org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:286)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:286)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:221)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.base/java.lang.Thread.run(Thread.java:834)


## Parquet
spark.read.parquet

https://databricks.com/glossary/what-is-parquet

In [None]:
df = spark.read.parquet(parquet_path)

read more about partition discovery
https://spark.apache.org/docs/latest/sql-data-sources-parquet.html#partition-discovery

## FORMAT & LOAD
generic way of reading data from the above data sources

In [None]:
df = spark.read.format("parquet").load(parquet_path)

In [None]:
df = spark.read.format('jdbc').option().load()

In [None]:
df = spark.read.format('csv').option().load()

usefull when developing frameworks (reading metadata and using generic ETL)

## Write
same as read, with additional options related to number of partitions.

assuming df is the final dataframe, you can do something like in the cells below

read the entire list of options at
https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrameWriter

In [None]:
df.write.csv(output_csv)

In [None]:
df.write.parquet(output_parquet)

In [None]:
df.write.json(output_json)

In [None]:
df.write.jdbc()

In [None]:
df.format('parquet|jdbc|json').option().save()

## Transformations

In [46]:
csv_file = '../data/uk-postcode.csv'
df = spark.read.csv(csv_file, header = True, inferSchema=True)

In [47]:
df.show()

+--------+--------+---------+-------+--------+--------+--------------------+-------------+---------+----------------+----------+----------+
|Postcode|Latitude|Longitude|Easting|Northing| GridRef|           Town/Area|       Region|Postcodes|Active postcodes|Population|Households|
+--------+--------+---------+-------+--------+--------+--------------------+-------------+---------+----------------+----------+----------+
|     AB1| 57.1269| -2.13644| 391839|  804005|NJ918040|            Aberdeen|     Aberdeen|     2655|               0|      null|      null|
|     AB2| 57.1713| -2.14152| 391541|  808948|NJ915089|            Aberdeen|     Aberdeen|     3070|               0|      null|      null|
|     AB3| 57.0876| -2.59624| 363963|  799780|NO639997|            Aberdeen|     Aberdeen|     2168|               0|      null|      null|
|     AB4| 57.5343| -2.12713| 392487|  849358|NJ924493|Fraserburgh, Pete...|     Aberdeen|     2956|               0|      null|      null|
|     AB5| 57.4652| 

In [48]:
df.describe().show(5, False)

+-------+--------+------------------+------------------+------------------+-----------------+--------+-----------------+--------+-----------------+-----------------+------------------+------------------+
|summary|Postcode|Latitude          |Longitude         |Easting           |Northing         |GridRef |Town/Area        |Region  |Postcodes        |Active postcodes |Population        |Households        |
+-------+--------+------------------+------------------+------------------+-----------------+--------+-----------------+--------+-----------------+-----------------+------------------+------------------+
|count  |3107    |3094              |3094              |3082              |3082             |3082    |3107             |3106    |3086             |3086             |2814              |2814              |
|mean   |null    |53.034849482870136|-2.051575161550915|399520.80012978584|351774.8997404283|null    |null             |null    |832.2216461438755|564.9565780946209|22437.184434968018|

In [49]:
+-------+---------+------------------+------------------+------------------+-----------------+--------+-----------------+--------+-----------------+-----------------+------------------+------------------+
|summary|Post Code|Latitude          |Longitude         |Easting           |Northing         |GridRef |Town/Area        |Region  |Postcodes        |Active postcodes |Population        |Households        |
+-------+---------+------------------+------------------+------------------+-----------------+--------+-----------------+--------+-----------------+-----------------+------------------+------------------+
|count  |3107     |3094              |3094              |3082              |3082             |3082    |3107             |3106    |3086             |3086             |2814              |2814              |
|mean   |null     |53.034849482870136|-2.051575161550915|399520.80012978584|351774.8997404283|null    |null             |null    |832.2216461438755|564.9565780946209|22437.184434968018|9390.271144278608 |
|stddev |null     |1.8865014315147148|1.8334605907478179|121798.85778550198|209187.830957896 |null    |null             |null    |600.2495165657779|397.5467297411277|16578.512623860708|6814.9887522729805|
|min    |AB1      |49.1995           |-7.82593          |22681             |8307             |HU390111|Abbey Hey, Gorton|Aberdeen|1                |0                |2                 |1                 |
|max    |ZE3      |60.3156           |1.73337           |653560            |1159304          |TV604994|York City Centre |York    |3621             |2644             |153812            |61886             |
+-------+---------+------------------+------------------+------------------+-----------------+--------+-----------------+--------+-----------------+-----------------+------------------+------------------+


SyntaxError: invalid syntax (<ipython-input-49-8db1ddc8287c>, line 1)

In [50]:
# select fields
df.select('Postcode', 'Latitude').show()

+--------+--------+
|Postcode|Latitude|
+--------+--------+
|     AB1| 57.1269|
|     AB2| 57.1713|
|     AB3| 57.0876|
|     AB4| 57.5343|
|     AB5| 57.4652|
|     AB9| 57.1466|
|    AB10| 57.1348|
|    AB11| 57.1371|
|    AB12| 57.1033|
|    AB13| 57.1127|
|    AB14| 57.1033|
|    AB15| 57.1388|
|    AB16| 57.1596|
|    AB21| 57.2091|
|    AB22| 57.1864|
|    AB23| 57.2088|
|    AB24| 57.1634|
|    AB25| 57.1534|
|    AB30|  56.846|
|    AB31| 57.0672|
+--------+--------+
only showing top 20 rows



In [51]:
# rename column
df = df.withColumnRenamed('Postcode', 'Post Code')
df.show()

+---------+--------+---------+-------+--------+--------+--------------------+-------------+---------+----------------+----------+----------+
|Post Code|Latitude|Longitude|Easting|Northing| GridRef|           Town/Area|       Region|Postcodes|Active postcodes|Population|Households|
+---------+--------+---------+-------+--------+--------+--------------------+-------------+---------+----------------+----------+----------+
|      AB1| 57.1269| -2.13644| 391839|  804005|NJ918040|            Aberdeen|     Aberdeen|     2655|               0|      null|      null|
|      AB2| 57.1713| -2.14152| 391541|  808948|NJ915089|            Aberdeen|     Aberdeen|     3070|               0|      null|      null|
|      AB3| 57.0876| -2.59624| 363963|  799780|NO639997|            Aberdeen|     Aberdeen|     2168|               0|      null|      null|
|      AB4| 57.5343| -2.12713| 392487|  849358|NJ924493|Fraserburgh, Pete...|     Aberdeen|     2956|               0|      null|      null|
|      AB5| 5

In [52]:
df.schema.fieldNames()

['Post Code',
 'Latitude',
 'Longitude',
 'Easting',
 'Northing',
 'GridRef',
 'Town/Area',
 'Region',
 'Postcodes',
 'Active postcodes',
 'Population',
 'Households']

In [53]:
from pyspark.sql.functions import col, when
df = df.withColumn('type', when(col('Population') < 10000, 'village').when(df['Population'] < 20000, 'town').otherwise('city').alias('type'))

In [54]:
df.select('type').distinct().show()

+-------+
|   type|
+-------+
|   city|
|   town|
|village|
+-------+



In [55]:
# if you write a condition like this, is easier to read it
df = df.withColumn('schema',
                   when(col('Population').isNull(), None)\
                   .when(col('Population') < 10000, 'village')\
                   .when(df['Population'] < 20000, 'town')\
                   .otherwise('city'))

In [56]:
df.filter(df.schema.isNull()).show(5)
# why do we have an error here?

AttributeError: 'StructType' object has no attribute 'isNull'

In [57]:
df.filter(df['schema'].isNull()).show(5)

+---------+--------+---------+-------+--------+--------+--------------------+--------+---------+----------------+----------+----------+----+------+
|Post Code|Latitude|Longitude|Easting|Northing| GridRef|           Town/Area|  Region|Postcodes|Active postcodes|Population|Households|type|schema|
+---------+--------+---------+-------+--------+--------+--------------------+--------+---------+----------------+----------+----------+----+------+
|      AB1| 57.1269| -2.13644| 391839|  804005|NJ918040|            Aberdeen|Aberdeen|     2655|               0|      null|      null|city|  null|
|      AB2| 57.1713| -2.14152| 391541|  808948|NJ915089|            Aberdeen|Aberdeen|     3070|               0|      null|      null|city|  null|
|      AB3| 57.0876| -2.59624| 363963|  799780|NO639997|            Aberdeen|Aberdeen|     2168|               0|      null|      null|city|  null|
|      AB4| 57.5343| -2.12713| 392487|  849358|NJ924493|Fraserburgh, Pete...|Aberdeen|     2956|               0

In [None]:
+---------+--------+---------+-------+--------+--------+--------------------+--------+---------+----------------+----------+----------+----+
|Post Code|Latitude|Longitude|Easting|Northing| GridRef|           Town/Area|  Region|Postcodes|Active postcodes|Population|Households|type|
+---------+--------+---------+-------+--------+--------+--------------------+--------+---------+----------------+----------+----------+----+
|      AB1| 57.1269| -2.13644| 391839|  804005|NJ918040|            Aberdeen|Aberdeen|     2655|               0|      null|      null|city|
|      AB2| 57.1713| -2.14152| 391541|  808948|NJ915089|            Aberdeen|Aberdeen|     3070|               0|      null|      null|city|
|      AB3| 57.0876| -2.59624| 363963|  799780|NO639997|            Aberdeen|Aberdeen|     2168|               0|      null|      null|city|
|      AB4| 57.5343| -2.12713| 392487|  849358|NJ924493|Fraserburgh, Pete...|Aberdeen|     2956|               0|      null|      null|city|
|      AB5| 57.4652| -2.64764| 361248|  841843|NJ612418|Buckie, Huntly, I...|Aberdeen|     3002|               0|      null|      null|city|
+---------+--------+---------+-------+--------+--------+--------------------+--------+---------+----------------+----------+----------+----+

In [58]:
# replace null
df.na.fill('').filter(df['GridRef'] == 'SJ261898').show()

+---------+--------+---------+-------+--------+--------+--------------------+------+---------+----------------+----------+----------+----+------+
|Post Code|Latitude|Longitude|Easting|Northing| GridRef|           Town/Area|Region|Postcodes|Active postcodes|Population|Households|type|schema|
+---------+--------+---------+-------+--------+--------+--------------------+------+---------+----------------+----------+----------+----+------+
|     CH28| 53.4005| -3.11196| 326165|  389873|SJ261898|Non-geographic, M...|Wirral|       37|               4|      null|      null|city|      |
+---------+--------+---------+-------+--------+--------+--------------------+------+---------+----------------+----------+----------+----+------+



In [59]:
df.na.fill('ThisWasNULL').filter(df['GridRef'] == 'SJ261898').show()

+---------+--------+---------+-------+--------+--------+--------------------+------+---------+----------------+----------+----------+----+-----------+
|Post Code|Latitude|Longitude|Easting|Northing| GridRef|           Town/Area|Region|Postcodes|Active postcodes|Population|Households|type|     schema|
+---------+--------+---------+-------+--------+--------+--------------------+------+---------+----------------+----------+----------+----+-----------+
|     CH28| 53.4005| -3.11196| 326165|  389873|SJ261898|Non-geographic, M...|Wirral|       37|               4|      null|      null|city|ThisWasNULL|
+---------+--------+---------+-------+--------+--------+--------------------+------+---------+----------------+----------+----------+----+-----------+



In [60]:
df.na.drop().filter(df['GridRef'] == 'SJ261898').show()

+---------+--------+---------+-------+--------+-------+---------+------+---------+----------------+----------+----------+----+------+
|Post Code|Latitude|Longitude|Easting|Northing|GridRef|Town/Area|Region|Postcodes|Active postcodes|Population|Households|type|schema|
+---------+--------+---------+-------+--------+-------+---------+------+---------+----------------+----------+----------+----+------+
+---------+--------+---------+-------+--------+-------+---------+------+---------+----------------+----------+----------+----+------+



In [61]:
df.select('schema').na.fill('').replace({'town': 'Town'}).distinct().collect()

[Row(schema='city'), Row(schema='village'), Row(schema=''), Row(schema='Town')]

In [62]:
df.withColumn('extra_column', 'literal_value').printSchema()

AssertionError: col should be Column

In [64]:
from pyspark.sql.functions import lit
df.withColumn('extra_column', lit('literal_value')).printSchema()

root
 |-- Post Code: string (nullable = true)
 |-- Latitude: double (nullable = true)
 |-- Longitude: double (nullable = true)
 |-- Easting: integer (nullable = true)
 |-- Northing: integer (nullable = true)
 |-- GridRef: string (nullable = true)
 |-- Town/Area: string (nullable = true)
 |-- Region: string (nullable = true)
 |-- Postcodes: integer (nullable = true)
 |-- Active postcodes: integer (nullable = true)
 |-- Population: integer (nullable = true)
 |-- Households: integer (nullable = true)
 |-- type: string (nullable = false)
 |-- schema: string (nullable = true)
 |-- extra_column: string (nullable = false)



In [65]:
# group by

In [66]:
ag = df.groupby('Region', 'Town/Area')

In [67]:
print(ag.count(), df.count())

DataFrame[Region: string, Town/Area: string, count: bigint] 3107


In [68]:
df.groupby('Region').count().show()

+--------------------+-----+
|              Region|count|
+--------------------+-----+
|           Worcester|   11|
|           Charnwood|    2|
|      North Kesteven|    2|
|                Arun|    3|
|       Epping Forest|    6|
|             Waveney|    3|
|              Stroud|    5|
| Nuneaton & Bedworth|    3|
|          New Forest|    8|
|           Newmarket|    1|
|              Maldon|    2|
|           Sedgemoor|    1|
|            Worthing|    6|
|            Brighton|    1|
|           Guildford|    8|
|              Bolton|    9|
|Central Bedfordshire|    8|
|      North Tyneside|    7|
|           Bradford |    1|
|        Surrey Heath|    5|
+--------------------+-----+
only showing top 20 rows



In [69]:
df.groupby('Region', 'type').sum().show()

+--------------------+-------+------------------+-------------------+------------+-------------+--------------+---------------------+---------------+---------------+
|              Region|   type|     sum(Latitude)|     sum(Longitude)|sum(Easting)|sum(Northing)|sum(Postcodes)|sum(Active postcodes)|sum(Population)|sum(Households)|
+--------------------+-------+------------------+-------------------+------------+-------------+--------------+---------------------+---------------+---------------+
|          New Forest|   city|          254.1205|           -7.90659|     2147917|       514103|          6330|                 4719|         157569|          68905|
|          North Down|   city|          163.8122|          -16.83903|      500282|      1587640|          2538|                 2538|          83041|          34521|
|            Hereford|village|          156.4724| -8.624130000000001|     1020748|       754231|          1099|                  943|          19355|           8570|
|   

In [70]:
ag = df.groupby('Region', 'type').agg({'Region': 'count'}).withColumnRenamed('count(Region)', 'asd')

In [71]:
ag.show(20)

+--------------------+-------+---+
|              Region|   type|asd|
+--------------------+-------+---+
|          New Forest|   city|  5|
|          North Down|   city|  3|
|            Hereford|village|  3|
|   Llandrindod Wells|village|  6|
|         Mole Valley|village|  1|
|           Hambleton|   town|  1|
|      South Ayrshire|   town|  4|
|       Blaenau Gwent|   city|  3|
|           Rotherham|   town|  1|
|Richmond upon Thames|   town|  3|
|            Somerset|village| 14|
|            Strabane|village|  1|
|               Leeds|village|  6|
|  Telford and Wrekin|   city|  3|
|           Blackburn|   city|  1|
|         Bournemouth|   city|  4|
|              Antrim|   city|  1|
|            Tendring|   town|  2|
|      East Hampshire|   town|  2|
|              Rother|   town|  4|
+--------------------+-------+---+
only showing top 20 rows



In [72]:
import pyspark.sql.functions as pf
ag = df.groupby('Region', 'type').agg(pf.sum('Population').alias('sum_population'), pf.count('PostCodes'))

In [73]:
ag.show()

+--------------------+-------+--------------+----------------+
|              Region|   type|sum_population|count(PostCodes)|
+--------------------+-------+--------------+----------------+
|          New Forest|   city|        157569|               5|
|          North Down|   city|         83041|               3|
|            Hereford|village|         19355|               3|
|   Llandrindod Wells|village|         19566|               6|
|         Mole Valley|village|          4313|               1|
|           Hambleton|   town|         12832|               1|
|      South Ayrshire|   town|         61814|               4|
|       Blaenau Gwent|   city|         75512|               3|
|           Rotherham|   town|         19772|               1|
|Richmond upon Thames|   town|         52176|               3|
|            Somerset|village|         58212|              14|
|            Strabane|village|          7691|               1|
|               Leeds|village|         45159|          

## Partitions

Choose the right partition column. Think about how the cardinality of that column affects how the data gets distributed.

When in doubt hash is better (safer)

In [74]:
# get number of partitions
df.rdd.getNumPartitions()

1

In [75]:
# you can use repartition to redistribuite data
# triggers a shuffle

# repartition by hash
df = df.repartition(10)

# repartition by columns
df = df.repartition('col1','col2')

# repartition by hash and cols
df = df.repartition(10, 'col1', 'col2')

AnalysisException: cannot resolve '`col1`' given input columns: [Active postcodes, Easting, GridRef, Households, Latitude, Longitude, Northing, Population, Post Code, Postcodes, Region, Town/Area, schema, type];;
'RepartitionByExpression ['col1, 'col2], 200
+- Repartition 10, true
   +- Project [Post Code#1866, Latitude#1026, Longitude#1027, Easting#1028, Northing#1029, GridRef#1030, Town/Area#1031, Region#1032, Postcodes#1033, Active postcodes#1034, Population#1035, Households#1036, type#1941, CASE WHEN isnull(Population#1035) THEN cast(null as string) WHEN (Population#1035 < 10000) THEN village WHEN (Population#1035 < 20000) THEN town ELSE city END AS schema#1962]
      +- Project [Post Code#1866, Latitude#1026, Longitude#1027, Easting#1028, Northing#1029, GridRef#1030, Town/Area#1031, Region#1032, Postcodes#1033, Active postcodes#1034, Population#1035, Households#1036, CASE WHEN (Population#1035 < 10000) THEN village WHEN (Population#1035 < 20000) THEN town ELSE city END AS type#1941]
         +- Project [Postcode#1025 AS Post Code#1866, Latitude#1026, Longitude#1027, Easting#1028, Northing#1029, GridRef#1030, Town/Area#1031, Region#1032, Postcodes#1033, Active postcodes#1034, Population#1035, Households#1036]
            +- Relation[Postcode#1025,Latitude#1026,Longitude#1027,Easting#1028,Northing#1029,GridRef#1030,Town/Area#1031,Region#1032,Postcodes#1033,Active postcodes#1034,Population#1035,Households#1036] csv


In [76]:
# you can use coalesce to reduce the number of partitions
# assuming 10 partitions and 5 workers
df = df.coalesce(5)
# will reduce the number of partions without triggering a shuffle
# df.coalesce(20) will not do anything because 20 > 10

In [77]:
df.select('Region').distinct().count()

427

In [78]:
df_200 = df.repartition('Region')

In [79]:
df_200.rdd.getNumPartitions()

200

In [80]:
df_200.show(20)

+---------+--------+---------+-------+--------+--------+--------------------+--------------+---------+----------------+----------+----------+-------+-------+
|Post Code|Latitude|Longitude|Easting|Northing| GridRef|           Town/Area|        Region|Postcodes|Active postcodes|Population|Households|   type| schema|
+---------+--------+---------+-------+--------+--------+--------------------+--------------+---------+----------------+----------+----------+-------+-------+
|      WR1| 52.1974| -2.21749| 385231|  255485|SO852554|           Worcester|     Worcester|      835|             431|      9231|      4850|village|village|
|      WR2| 52.1916| -2.23948| 383726|  254845|SO837548|           Worcester|     Worcester|      902|             711|     29799|     12092|   city|   city|
|      WR3| 52.2159| -2.21017| 385738|  257541|SO857575|           Worcester|     Worcester|      677|             515|     20397|      8738|   city|   city|
|      WR4| 52.2049| -2.18969| 387134|  256314|SO871

In [81]:
df_200.coalesce(427).rdd.getNumPartitions()

200

In [82]:
df_200.coalesce(100).rdd.getNumPartitions()

100

In [83]:
# if you do coalesce(1) only one worker will do the work.
# if you have "unexecuted" transformations repartition(1) is better

In [84]:
# "save" the dataframe in memory or disk to reusse it
df = df.cache()  # <== very important to "store" the result in a new variable

## UDF

In [85]:
from pyspark.sql.functions import udf

In [86]:
def my_custom_fct(x, y):
    if condition:
        return int
    else
        return str
    return x+y

udf_my_custom_fct = udf(my_custom_fct)

In [87]:
df.show(20)

+---------+--------+----------+-------+--------+--------+--------------------+--------------------+---------+----------------+----------+----------+-------+-------+
|Post Code|Latitude| Longitude|Easting|Northing| GridRef|           Town/Area|              Region|Postcodes|Active postcodes|Population|Households|   type| schema|
+---------+--------+----------+-------+--------+--------+--------------------+--------------------+---------+----------------+----------+----------+-------+-------+
|     TA18| 50.8829|  -2.78813| 344653|  109563|ST446095|           Crewkerne|            Somerset|      505|             399|     10136|      4565|   town|   town|
|     IV17| 57.6988|  -4.25616| 265632|  869902|NH656699|              Alness|            Highland|      182|             170|      6444|      2710|village|village|
|      EN8| 51.6964|-0.0332062| 536027|  201570|TL360015|Waltham Cross, Ch...|             Enfield|     1414|             911|     35807|     14936|   city|   city|
|     CA28

In [None]:
df.withColumn('calculated_value', udf_my_custom_fct(df['Population']))