# Introduction to DataFrame

## Initialize Spark

We start with a `SparkSession`

In [2]:
from pyspark.sql import SparkSession

spark = (
    SparkSession
        .builder
        .appName("intro-to-df")
        .master("local")
        .getOrCreate()
)

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/03/07 10:49:31 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


## Loading a CSV file into a DataFrame

See what do we have in the `data` folder

In [7]:
! ls data

linkage.csv


Load the `linkage.csv` file using the `csv` method on the Reader API

In [14]:
help(spark.read)

Help on DataFrameReader in module pyspark.sql.readwriter object:

class DataFrameReader(OptionUtils)
 |  DataFrameReader(spark)
 |  
 |  Interface used to load a :class:`DataFrame` from external storage systems
 |  (e.g. file systems, key-value stores, etc). Use :attr:`SparkSession.read`
 |  to access this.
 |  
 |  .. versionadded:: 1.4
 |  
 |  Method resolution order:
 |      DataFrameReader
 |      OptionUtils
 |      builtins.object
 |  
 |  Methods defined here:
 |  
 |  __init__(self, spark)
 |      Initialize self.  See help(type(self)) for accurate signature.
 |  
 |  csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=None, comment=None, header=None, inferSchema=None, ignoreLeadingWhiteSpace=None, ignoreTrailingWhiteSpace=None, nullValue=None, nanValue=None, positiveInf=None, negativeInf=None, dateFormat=None, timestampFormat=None, maxColumns=None, maxCharsPerColumn=None, maxMalformedLogPerPartition=None, mode=None, columnNameOfCorruptRecord=None, multiLi

In [17]:
help(spark.read.csv)

Help on method csv in module pyspark.sql.readwriter:

csv(path, schema=None, sep=None, encoding=None, quote=None, escape=None, comment=None, header=None, inferSchema=None, ignoreLeadingWhiteSpace=None, ignoreTrailingWhiteSpace=None, nullValue=None, nanValue=None, positiveInf=None, negativeInf=None, dateFormat=None, timestampFormat=None, maxColumns=None, maxCharsPerColumn=None, maxMalformedLogPerPartition=None, mode=None, columnNameOfCorruptRecord=None, multiLine=None, charToEscapeQuoteEscaping=None, samplingRatio=None, enforceSchema=None, emptyValue=None, locale=None, lineSep=None, pathGlobFilter=None, recursiveFileLookup=None, modifiedBefore=None, modifiedAfter=None, unescapedQuoteHandling=None) method of pyspark.sql.readwriter.DataFrameReader instance
    Loads a CSV file and returns the result as a  :class:`DataFrame`.
    
    This function will go through the input once to determine the input schema if
    ``inferSchema`` is enabled. To avoid going through the entire data once, di

In [3]:
linkage = spark.read.csv("data/linkage.csv")

22/03/07 10:49:42 WARN SizeEstimator: Failed to check whether UseCompressedOops is set; assuming yes
22/03/07 10:49:42 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(scavenge), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors
22/03/07 10:49:42 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(global, scavenge), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors


Check what do we have

In [19]:
linkage

DataFrame[_c0: string, _c1: string, _c2: string, _c3: string, _c4: string, _c5: string, _c6: string, _c7: string, _c8: string, _c9: string, _c10: string, _c11: string]

We can look at the head of the DataFrame calling the `show` method

In [20]:
help(linkage.show)

Help on method show in module pyspark.sql.dataframe:

show(n=20, truncate=True, vertical=False) method of pyspark.sql.dataframe.DataFrame instance
    Prints the first ``n`` rows to the console.
    
    .. versionadded:: 1.3.0
    
    Parameters
    ----------
    n : int, optional
        Number of rows to show.
    truncate : bool or int, optional
        If set to ``True``, truncate strings longer than 20 chars by default.
        If set to a number greater than one, truncates long strings to length ``truncate``
        and align cells right.
    vertical : bool, optional
        If set to ``True``, print output rows vertically (one line
        per column value).
    
    Examples
    --------
    >>> df
    DataFrame[age: int, name: string]
    >>> df.show()
    +---+-----+
    |age| name|
    +---+-----+
    |  2|Alice|
    |  5|  Bob|
    +---+-----+
    >>> df.show(truncate=3)
    +---+----+
    |age|name|
    +---+----+
    |  2| Ali|
    |  5| Bob|
    +---+----+
    >>> df

In [21]:
linkage.show()

+-----+-----+-----------------+------------+------------+------------+-------+------+------+------+-------+--------+
|  _c0|  _c1|              _c2|         _c3|         _c4|         _c5|    _c6|   _c7|   _c8|   _c9|   _c10|    _c11|
+-----+-----+-----------------+------------+------------+------------+-------+------+------+------+-------+--------+
| id_1| id_2|     cmp_fname_c1|cmp_fname_c2|cmp_lname_c1|cmp_lname_c2|cmp_sex|cmp_bd|cmp_bm|cmp_by|cmp_plz|is_match|
|37291|53113|0.833333333333333|           ?|           1|           ?|      1|     1|     1|     1|      0|    TRUE|
|39086|47614|                1|           ?|           1|           ?|      1|     1|     1|     1|      1|    TRUE|
|70031|70237|                1|           ?|           1|           ?|      1|     1|     1|     1|      1|    TRUE|
|84795|97439|                1|           ?|           1|           ?|      1|     1|     1|     1|      1|    TRUE|
|36950|42116|                1|           ?|           1|       

**Can anyone spot what's wrong with the above data?**

...

--> **What are these "question marks"??**

let's check the schema of our dataframe

In [29]:
linkage.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: string (nullable = true)
 |-- _c3: string (nullable = true)
 |-- _c4: string (nullable = true)
 |-- _c5: string (nullable = true)
 |-- _c6: string (nullable = true)
 |-- _c7: string (nullable = true)
 |-- _c8: string (nullable = true)
 |-- _c9: string (nullable = true)
 |-- _c10: string (nullable = true)
 |-- _c11: string (nullable = true)



**Why everything is a string?**

## Managing Schema and Null Values

We are going to load the dataframe again, but this time tell the Reader API a couple of things:

- first row is the header
- treat "?" as null values
- infer the schema from values

In [26]:
linkage_df = (
    spark.read
        .option("header", "true")
        .option("nullValue", "?")
        .option("inferSchema", "true")
        .csv("data/linkage.csv")
)

                                                                                

In [25]:
df.printSchema()

root
 |-- id_1: integer (nullable = true)
 |-- id_2: integer (nullable = true)
 |-- cmp_fname_c1: double (nullable = true)
 |-- cmp_fname_c2: double (nullable = true)
 |-- cmp_lname_c1: double (nullable = true)
 |-- cmp_lname_c2: double (nullable = true)
 |-- cmp_sex: integer (nullable = true)
 |-- cmp_bd: integer (nullable = true)
 |-- cmp_bm: integer (nullable = true)
 |-- cmp_by: integer (nullable = true)
 |-- cmp_plz: integer (nullable = true)
 |-- is_match: boolean (nullable = true)



**Now the schema looks better!**

What about the values?

In [30]:
df.show()

+-----+-----+-----------------+------------+------------+------------+-------+------+------+------+-------+--------+
| id_1| id_2|     cmp_fname_c1|cmp_fname_c2|cmp_lname_c1|cmp_lname_c2|cmp_sex|cmp_bd|cmp_bm|cmp_by|cmp_plz|is_match|
+-----+-----+-----------------+------------+------------+------------+-------+------+------+------+-------+--------+
|37291|53113|0.833333333333333|        null|         1.0|        null|      1|     1|     1|     1|      0|    true|
|39086|47614|              1.0|        null|         1.0|        null|      1|     1|     1|     1|      1|    true|
|70031|70237|              1.0|        null|         1.0|        null|      1|     1|     1|     1|      1|    true|
|84795|97439|              1.0|        null|         1.0|        null|      1|     1|     1|     1|      1|    true|
|36950|42116|              1.0|        null|         1.0|         1.0|      1|     1|     1|     1|      1|    true|
|42413|48491|              1.0|        null|         1.0|       

## Transformations and Actions

Creating a DataFrame does not cause any distributed computation in the cluster. **A DataFrame is a data set representing an intermediate step in a computation**.

For operating data (in a distributed manner), we have two type of operations: **transformations** and **actions**:

- Transformations: lazy evaluation. They're not computed immediately, but they are recorded as a lineage for query plan optimization.
- Actions: distributed computation occurs after invoking an action

Let's see how many records do we have in our DataFrame

In [31]:
df.count()

574913

We can use the `collect` action to return an `Array` with all the `Row` objects in our DataFrame.

**Such `Array` will reside in local memory!!**

In [32]:
df.collect()

                                                                                

[Row(id_1=37291, id_2=53113, cmp_fname_c1=0.833333333333333, cmp_fname_c2=None, cmp_lname_c1=1.0, cmp_lname_c2=None, cmp_sex=1, cmp_bd=1, cmp_bm=1, cmp_by=1, cmp_plz=0, is_match=True),
 Row(id_1=39086, id_2=47614, cmp_fname_c1=1.0, cmp_fname_c2=None, cmp_lname_c1=1.0, cmp_lname_c2=None, cmp_sex=1, cmp_bd=1, cmp_bm=1, cmp_by=1, cmp_plz=1, is_match=True),
 Row(id_1=70031, id_2=70237, cmp_fname_c1=1.0, cmp_fname_c2=None, cmp_lname_c1=1.0, cmp_lname_c2=None, cmp_sex=1, cmp_bd=1, cmp_bm=1, cmp_by=1, cmp_plz=1, is_match=True),
 Row(id_1=84795, id_2=97439, cmp_fname_c1=1.0, cmp_fname_c2=None, cmp_lname_c1=1.0, cmp_lname_c2=None, cmp_sex=1, cmp_bd=1, cmp_bm=1, cmp_by=1, cmp_plz=1, is_match=True),
 Row(id_1=36950, id_2=42116, cmp_fname_c1=1.0, cmp_fname_c2=None, cmp_lname_c1=1.0, cmp_lname_c2=1.0, cmp_sex=1, cmp_bd=1, cmp_bm=1, cmp_by=1, cmp_plz=1, is_match=True),
 Row(id_1=42413, id_2=48491, cmp_fname_c1=1.0, cmp_fname_c2=None, cmp_lname_c1=1.0, cmp_lname_c2=None, cmp_sex=1, cmp_bd=1, cmp_bm=1

## Write to Disk

What if we want to write the dataframe to disk, say in a different format?

In [35]:
df.write.format("parquet").save("data/linkage-parquet")

                                                                                

In [37]:
! ls data/linkage-parquet

_SUCCESS
part-00000-e73ba34e-da7b-4440-b68e-f9da75527d13-c000.snappy.parquet


## Analyzing Data

All good for now, but we don't load data for the sake of it, we do it because we want to run some analysis.

Let's show the first 5 records of our dataframe

In [38]:
df.show(5)

+-----+-----+-----------------+------------+------------+------------+-------+------+------+------+-------+--------+
| id_1| id_2|     cmp_fname_c1|cmp_fname_c2|cmp_lname_c1|cmp_lname_c2|cmp_sex|cmp_bd|cmp_bm|cmp_by|cmp_plz|is_match|
+-----+-----+-----------------+------------+------------+------------+-------+------+------+------+-------+--------+
|37291|53113|0.833333333333333|        null|         1.0|        null|      1|     1|     1|     1|      0|    true|
|39086|47614|              1.0|        null|         1.0|        null|      1|     1|     1|     1|      1|    true|
|70031|70237|              1.0|        null|         1.0|        null|      1|     1|     1|     1|      1|    true|
|84795|97439|              1.0|        null|         1.0|        null|      1|     1|     1|     1|      1|    true|
|36950|42116|              1.0|        null|         1.0|         1.0|      1|     1|     1|     1|      1|    true|
+-----+-----+-----------------+------------+------------+-------

**What can we spot from here?**

- First two fields are integer IDs. There represent the patients that were matched in the record
- The next nine values are numeric values (int and double) representing match scores on different fields, such as their names, sex, birthday, and locations. Fields are stored as integeres where the possible values are match (1) or no-match (0), and doubles whenever partial matches are possible
- The last field is a boolean value indicating whether or not the pair of patient records represented by the line was a match.

**We could use this dataset to build a simple classifier that allows us to predict whether a record will be a match based on the values of the match scores for patient records.**

### Caching

Each time we process data (e.g., calling the `count` method), Spark re-opens the file, parses the rows, and then executes the action requested. 
It does not matter if we have filtered the data and created a smaller set of records. Spark will execute all over again. Indeed, this is intended. Fault-tolerance would not be possible without this feature!

We can use the `cache` method to indicate Spark to store the DataFrame in memory.

In [39]:
help(df.cache)

Help on method cache in module pyspark.sql.dataframe:

cache() method of pyspark.sql.dataframe.DataFrame instance
    Persists the :class:`DataFrame` with the default storage level (`MEMORY_AND_DISK`).
    
    .. versionadded:: 1.3.0
    
    Notes
    -----
    The default storage level has changed to `MEMORY_AND_DISK` to match Scala in 2.0.



**Do you remember the following question from our last class?"

- Spark is in-memory only. Myth or misconception?

**Can you spot something from above that helps you to answer the question?**

In [40]:
df_cached = df.cache()

The contents of the DataFrame `df_cached` are going to be stored in memory the next time it's computed.

In [41]:
df_cached.count()

                                                                                

574913

In [44]:
df_cached.take(10)

[Row(id_1=37291, id_2=53113, cmp_fname_c1=0.833333333333333, cmp_fname_c2=None, cmp_lname_c1=1.0, cmp_lname_c2=None, cmp_sex=1, cmp_bd=1, cmp_bm=1, cmp_by=1, cmp_plz=0, is_match=True),
 Row(id_1=39086, id_2=47614, cmp_fname_c1=1.0, cmp_fname_c2=None, cmp_lname_c1=1.0, cmp_lname_c2=None, cmp_sex=1, cmp_bd=1, cmp_bm=1, cmp_by=1, cmp_plz=1, is_match=True),
 Row(id_1=70031, id_2=70237, cmp_fname_c1=1.0, cmp_fname_c2=None, cmp_lname_c1=1.0, cmp_lname_c2=None, cmp_sex=1, cmp_bd=1, cmp_bm=1, cmp_by=1, cmp_plz=1, is_match=True),
 Row(id_1=84795, id_2=97439, cmp_fname_c1=1.0, cmp_fname_c2=None, cmp_lname_c1=1.0, cmp_lname_c2=None, cmp_sex=1, cmp_bd=1, cmp_bm=1, cmp_by=1, cmp_plz=1, is_match=True),
 Row(id_1=36950, id_2=42116, cmp_fname_c1=1.0, cmp_fname_c2=None, cmp_lname_c1=1.0, cmp_lname_c2=1.0, cmp_sex=1, cmp_bd=1, cmp_bm=1, cmp_by=1, cmp_plz=1, is_match=True),
 Row(id_1=42413, id_2=48491, cmp_fname_c1=1.0, cmp_fname_c2=None, cmp_lname_c1=1.0, cmp_lname_c2=None, cmp_sex=1, cmp_bd=1, cmp_bm=1

The `take` method from above is accessing the cached elements of `df_cached` instead of recomputing them from their dependencies (i.e., the "lineage").

== Physical Plan ==
FileScan csv [id_1#117,id_2#118,cmp_fname_c1#119,cmp_fname_c2#120,cmp_lname_c1#121,cmp_lname_c2#122,cmp_sex#123,cmp_bd#124,cmp_bm#125,cmp_by#126,cmp_plz#127,is_match#128] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/Users/cmontemuio/repos/personal/bda-course/coursebook/modules/m2..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id_1:int,id_2:int,cmp_fname_c1:double,cmp_fname_c2:double,cmp_lname_c1:double,cmp_lname_c2...


