In [1]:
from IPython.display import display, HTML
display(HTML("<style>.container { width:99% !important; }</style>"))

In [2]:
import pyspark
import pandas as pd
import numpy as np
import matplotlib
import os

In [3]:
hadoop_home = os.environ.get('HADOOP_HOME', None)
spark_home = os.environ.get('PYSPARK_PYTHON', None)
java_home = os.environ.get('JAVA_HOME', None)
print(hadoop_home,'\n',spark_home,'\n',java_home)

F:\my_drive\DataCamp\pyspark\winutils\hadoop-3.0.0 
 python 
 C:\Program Files\Microsoft\jdk-17.0.7.7-hotspot\


In [4]:
sc = pyspark.SparkContext.getOrCreate() 
sc

In [5]:
spark = pyspark.sql.SparkSession(sc)
spark

# 01 DataFrame details

## Intro to data cleaning with Apache Spark

1. Intro to data cleaning with Apache Spark

Welcome to Data Cleaning in Apache Spark with Python. My name is Mike Metzger, I am a Data Engineering Consultant, and I will be your instructor for this course. We will cover what data cleaning is, why it's important, and how to implement it with Spark and Python. Let's get started!

2. What is Data Cleaning?

In this course, we'll define "data cleaning" as preparing raw data for use in processing pipelines. We'll discuss what a pipeline is later on, but for now, it's sufficient to say that data cleaning is a necessary part of any production data system. If your data isn't "clean", it's not trustworthy and could cause problems later on. There are many tasks that could fall under the data cleaning umbrella. A few of these include reformatting or replacing text; performing calculations based on the data; and removing garbage or incomplete data.

3. Why perform data cleaning with Spark?

Most data cleaning systems have two big problems: optimizing performance and organizing the flow of data. A typical programming language (such as Perl, C++, or even standard SQL) may be able to clean data when you have small quantities of data. But consider what happens when you have millions or even billions of pieces of data. Those languages wouldn't be able to process that amount of information in a timely manner. Spark lets you scale your data processing capacity as your requirements evolve. Beyond the performance issues, dealing with large quantities of data requires a process, or pipeline of steps. Spark allows management of many complex tasks within a single framework.

4. Data cleaning example

Here's an example of cleaning a small data set. We're given a table of names, age in years, and a city. Our requirements are for a DataFrame with first and last name in separate columns, the age in months, and which state the city is in. We also want to remove any rows where the data is out of the ordinary. Using Spark transformations, we can create a DataFrame with these properties and continue processing afterwards.

5. Spark Schemas

A primary function of data cleaning is to verify all data is in the expected format. Spark provides a built-in ability to validate datasets with schemas. You may have used schemas before with databases or XML; Spark is similar. A schema defines and validates the number and types of columns for a given DataFrame. A schema can contain many different types of fields - integers, floats, dates, strings, and even arrays or mapping structures. A defined schema allows Spark to filter out data that doesn't conform during read, ensuring expected correctness. In addition, schemas also have performance benefits. Normally a data import will try to infer a schema on read - this requires reading the data twice. Defining a schema limits this to a single read operation.

6. Example Spark Schema

Here is an example schema to the import data from our previous example. First we'll import the pyspark.sql.types library. Next we define the actual StructType list of StructFields, containing an entry for each field in the data. Each StructField consists of a field name, dataType, and whether the data can be null. Once our schema is defined, we can add it into our spark.read.format.load call and process it against our data. The load() method takes two arguments - the filename and a schema. This is where we apply our schema to the data being loaded.

7. Let's practice!

We've gone over a lot of information regarding data cleaning and the importance of dataframe schemas. Let's put that information to use and practice!

### Data cleaning review
There are many benefits for using Spark for data cleaning.

Which of the following is NOT a benefit?

- Spark offers high performance.

- **Spark can only handle thousands of records.**

- Spark allows orderly data flows.

- Spark can use strictly defined schemas while ingesting data.

### Defining a schema
Creating a defined schema helps with data quality and import performance. As mentioned during the lesson, we'll create a simple schema to read in the following columns:

Name
Age
City
The `Name` and `City` columns are `StringType()` and the `Age` column is an `IntegerType()`.

**Instructions**

- Import * from the `pyspark.sql.types` library.
- Define a new schema using the `StructType` method.
- Define a `StructField` for `name`, `age`, and `city`. Each field should correspond to the correct datatype and not be nullable.

In [6]:
# Import the pyspark.sql.types library
from pyspark.sql.types import *

# Define a new schema using the StructType method
people_schema = StructType([
  # Define a StructField for each field
  StructField('name', StringType(), False),
  StructField('age', IntegerType(), False),
  StructField('city', StringType(), False)
])

## Immutability and lazy processing

1. Immutability and Lazy Processing

Welcome back! We've had a quick discussion about data cleaning, data types and schemas. Let's move on to some further Spark concepts - Immutability and Lazy Processing.

2. Variable review

Normally in Python, and most other languages, variables are fully mutable. The values can be changed at any given time, assuming the scope of the variable is valid. While very flexible, this does present problems anytime there are multiple concurrent components trying to modify the same data. Most languages work around these issues using constructs like mutexes, semaphores, etc. This can add complexity, especially with non-trivial programs.

3. Immutability

Unlike typical Python variables, Spark Data Frames are immutable. While not strictly required, immutability is often a component of functional programming. We won't go into everything that implies here, but understand that Spark is designed to use immutable objects. Practically, this means Spark Data Frames are defined once and are not modifiable after initialization. If the variable name is reused, the original data is removed (assuming it's not in use elsewhere) and the variable name is reassigned to the new data. While this seems inefficient, it actually allows Spark to share data between all cluster components. It can do so without worry about concurrent data objects.

4. Immutability Example

This is a quick example of the immutability of data frames in Spark. It's OK if you don't understand the actual code, this example is more about the concepts of what happens. First, we create a data frame from a CSV file called voterdata.csv. This creates a new data frame definition and assigns it to the variable name voter_df. Once created, we want to do two further operations. The first is to create a fullyear column by using a 2-digit year present in the data set and adding 2000 to each entry. This does not actually change the data frame at all. It copies the original definition, adds the transformation, and assigns it to the voter_df variable name. Our second operation is similar - now we want to drop the original year column from the data frame. Again, this copies the definition, adds a transformation and reassigns the variable name to this new object. The original objects are destroyed. Please note that the original year column is now permanently gone from this instance, though not from the underlying data (ie, you could simply reload it to a new dataframe if desired).

5. Lazy Processing

You may be wondering how Spark does this so quickly, especially on large data sets. Spark can do this because of something called lazy processing. Lazy processing in Spark is the idea that very little actually happens until an action is performed. In our previous example, we read a CSV file, added a new column, and deleted another. The trick is that no data was actually read / added / modified, we only updated the instructions (aka, Transformations) for what we wanted Spark to do. This functionality allows Spark to perform the most efficient set of operations to get the desired result. The code example is the same as the previous slide, but with the added count() method call. This classifies as an action in Spark and will process all the transformation operations.

6. Let's practice!

These concepts can be a little tricky to grasp without some examples. Let's practice these ideas in the coming exercises.

### Immutability review
You’ve just seen that immutability and lazy processing are fundamental concepts in the way Spark handles data. But why would Spark use immutable data frames to begin with?


- To add complexity to your Spark tasks.

- **To efficiently handle data throughout the cluster.**

- To easily modify variable values as needed.

- To conserve storage space.

## Using lazy processing
Lazy processing operations will usually return in about the same amount of time regardless of the actual quantity of data. Remember that this is due to Spark not performing any transformations until an action is requested.

For this exercise, we'll be defining a Data Frame (`aa_dfw_df`) and add a couple transformations. Note the amount of time required for the transformations to complete when defined vs when the data is actually queried. These differences may be short, but they will be noticeable. When working with a full Spark cluster with larger quantities of data the difference will be more apparent.

**Instructions**

- Load the Data Frame.
- Add the transformation for `F.lower()` to the Destination Airport column.
- Drop the Destination Airport column from the Data Frame `aa_dfw_df`. Note the time for these operations to complete.
- Show the Data Frame, noting the time difference for this action to complete.

In [7]:
%timeit
# Load the CSV file
aa_dfw_df = spark.read.format('csv').options(Header=True).load('AA_DFW_2017_Departures_Short.csv.gz')


In [8]:
from pyspark.sql import functions as F

In [9]:
%timeit
# Add the airport column using the F.lower() method
aa_dfw_df = aa_dfw_df.withColumn('airport', F.lower(aa_dfw_df['Destination Airport']))

In [10]:
%timeit
# Drop the Destination Airport column
aa_dfw_df = aa_dfw_df.drop(aa_dfw_df['Destination Airport'])

In [11]:
%timeit
# Show the DataFrame
aa_dfw_df.show()

+-----------------+-------------+-----------------------------+-------+
|Date (MM/DD/YYYY)|Flight Number|Actual elapsed time (Minutes)|airport|
+-----------------+-------------+-----------------------------+-------+
|       01/01/2017|         0005|                          537|    hnl|
|       01/01/2017|         0007|                          498|    ogg|
|       01/01/2017|         0037|                          241|    sfo|
|       01/01/2017|         0043|                          134|    dtw|
|       01/01/2017|         0051|                           88|    stl|
|       01/01/2017|         0060|                          149|    mia|
|       01/01/2017|         0071|                          203|    lax|
|       01/01/2017|         0074|                           76|    mem|
|       01/01/2017|         0081|                          123|    den|
|       01/01/2017|         0089|                          161|    slc|
|       01/01/2017|         0096|                           84| 

## Understanding Parquet

1. Understanding Parquet

Welcome back! As we've seen, Spark can read in text and CSV files. While this gives us access to many data sources, it's not always the most convenient format to work with. Let's take a look at a few problems with CSV files.

2. Difficulties with CSV files

Some common issues with CSV files include: The schema is not defined: there are no data types included, nor column names (beyond a header row). Using content containing a comma (or another delimiter) requires escaping. Using the escape character within content requires even further escaping. The available encoding formats are limited depending on the language used.

3. Spark and CSV files

In addition to the issues with CSV files in general, Spark has some specific problems processing CSV data. CSV files are quite slow to import and parse. The files cannot be shared between workers during the import process. If no schema is defined, all data must be read before a schema can be inferred. Spark has feature known as predicate pushdown. Basically, this is the idea of ordering tasks to do the least amount of work. Filtering data prior to processing is one of the primary optimizations of predicate pushdown. This drastically reduces the amount of information that must be processed in large data sets. Unfortunately, you cannot filter the CSV data via predicate pushdown. Finally, Spark processes are often multi-step and may utilize an intermediate file representation. These representations allow data to be used later without regenerating the data from source. Using CSV would instead require a significant amount of extra work defining schemas, encoding formats, etc.

4. The Parquet Format

Parquet is a compressed columnar data format developed for use in any Hadoop based system. This includes Spark, Hadoop, Apache Impala, and so forth. The Parquet format is structured with data accessible in chunks, allowing efficient read / write operations without processing the entire file. This structured format supports Spark's predicate pushdown functionality, providing significant performance improvement. Finally, Parquet files automatically include schema information and handle data encoding. This is perfect for intermediary or on-disk representation of processed data. Note that Parquet files are a binary file format and can only be used with the proper tools. This is in contrast to CSV files which can be edited with any text editor.

5. Working with Parquet

Interacting with Parquet files is very straightforward. To read a parquet file into a Data Frame, you have two options. The first is using the `spark.read.format` method we've seen previously. The Data Frame, df=spark.read.format('parquet').load('filename.parquet') The second option is the shortcut version: The Data Frame, df=spark.read.parquet('filename.parquet') Typically, the shortcut version is the easiest to use but you can use them interchangeably. Writing parquet files is similar, using either: df.write.format('parquet').save('filename.parquet') or df.write.parquet('filename.parquet') The long-form versions of each permit extra option flags, such as when overwriting an existing parquet file.

6. Parquet and SQL

Parquet files have various uses within Spark. We've discussed using them as an intermediate data format, but they also are perfect for performing SQL operations. To perform a SQL query against a Parquet file, we first need to create a Data Frame via the spark.read.parquet method. Once we have the Data Frame, we can use the createOrReplaceTempView() method to add an alias of the Parquet data as a SQL table. Finally, we run our query using normal SQL syntax and the spark.sql method. In this case, we're looking for all flights with a duration under 100 minutes. Because we're using Parquet as the backing store, we get all the performance benefits we've discussed previously (primarily defined schemas and the available use of predicate pushdown).

7. Let's Practice!

You've seen a bit about what a Parquet file is and why we'd want to use them. Now, let's practice working with Parquet files.

## Saving a DataFrame in Parquet format
When working with Spark, you'll often start with CSV, JSON, or other data sources. This provides a lot of flexibility for the types of data to load, but it is not an optimal format for Spark. The Parquet format is a columnar data store, allowing Spark to use predicate pushdown. This means Spark will only process the data necessary to complete the operations you define versus reading the entire dataset. This gives Spark more flexibility in accessing the data and often drastically improves performance on large datasets.

In this exercise, we're going to practice creating a new Parquet file and then process some data from it.

The spark object and the `df1` and `df2` DataFrames have been setup for you.

**Instructions**

- View the row count of `df1` and `df2`.
- Combine `df1` and `df2` in a new DataFrame named `df3` with the union method.
- Save `df3` to a parquet file named `AA_DFW_ALL.parquet`.
- Read the `AA_DFW_ALL.parquet` file and show the count.

In [12]:
df1 = spark.read.csv('AA_DFW_2014_Departures_Short.csv.gz')
df2 = spark.read.csv('AA_DFW_2015_Departures_Short.csv.gz')

In [13]:
# View the row count of df1 and df2
print("df1 Count: %d" % df1.count())
print("df2 Count: %d" % df2.count())

# Combine the DataFrames into one
df3 = df1.union(df2)

# Save the df3 DataFrame in Parquet format
df3.write.parquet('AA_DFW_ALL.parquet', mode='overwrite')

# Read the Parquet file into a new DataFrame and run a count
print(spark.read.parquet('AA_DFW_ALL.parquet').count())

df1 Count: 157199
df2 Count: 146559


Py4JJavaError: An error occurred while calling o50.parquet.
: java.lang.UnsatisfiedLinkError: 'boolean org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(java.lang.String, int)'
	at org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Native Method)
	at org.apache.hadoop.io.nativeio.NativeIO$Windows.access(NativeIO.java:793)
	at org.apache.hadoop.fs.FileUtil.canRead(FileUtil.java:1249)
	at org.apache.hadoop.fs.FileUtil.list(FileUtil.java:1454)
	at org.apache.hadoop.fs.RawLocalFileSystem.listStatus(RawLocalFileSystem.java:601)
	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1972)
	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:2014)
	at org.apache.hadoop.fs.ChecksumFileSystem.listStatus(ChecksumFileSystem.java:761)
	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1972)
	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:2014)
	at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.getAllCommittedTaskPaths(FileOutputCommitter.java:334)
	at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJobInternal(FileOutputCommitter.java:404)
	at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:377)
	at org.apache.parquet.hadoop.ParquetOutputCommitter.commitJob(ParquetOutputCommitter.java:48)
	at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.commitJob(HadoopMapReduceCommitProtocol.scala:192)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$writeAndCommit$3(FileFormatWriter.scala:275)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.util.Utils$.timeTakenMs(Utils.scala:640)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.writeAndCommit(FileFormatWriter.scala:275)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeWrite(FileFormatWriter.scala:304)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:190)
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:190)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:113)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:111)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:125)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:118)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:195)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:103)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:94)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:512)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:104)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:512)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:31)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:488)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:94)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:81)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:79)
	at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:133)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:856)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:387)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:360)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239)
	at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:789)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:833)


### SQL and Parquet
Parquet files are perfect as a backing data store for SQL queries in Spark. While it is possible to run the same queries directly via Spark's Python functions, sometimes it's easier to run SQL queries alongside the Python options.

For this example, we're going to read in the Parquet file we created in the last exercise and register it as a SQL table. Once registered, we'll run a quick query against the table (aka, the Parquet file).

The spark object and the `AA_DFW_ALL.parquet` file are available for you automatically.

**Instructions**

- Import the `AA_DFW_ALL.parquet` file into flights_df.
- Use the `createOrReplaceTempView` method to alias the flights table.
- Run a Spark SQL query against the flights table.

In [None]:
# Read the Parquet file into flights_df
flights_df = spark.read.parquet('AA_DFW_ALL.parquet')

# Register the temp table
flights_df.createOrReplaceTempView('flights')

# Run a SQL query of the average flight duration
avg_duration = spark.sql('SELECT avg(flight_duration) from flights').collect()[0]
print('The average flight time is: %d' % avg_duration)