## Reading a CSV file

Loading the data is the first step in building a data transformation pipeline. “Comma separated values” (CSV) is a file commonly used file format for data exchange. You’re now going to use Spark to read a CSV file.

You’ve seen in the videos how to load landing/prices.csv. Now let’s do the same for landing/ratings.csv, step by step. Remember, the actual data lake is made available to you under ~/workspace/mnt/data_lake.

A SparkSession named spark has already been loaded for you.

### Instructions
    - Create a DataFrameReader object using the spark.read property.
    - Make the reader object use the header of the CSV file to name the columns automatically, by passing in the correct keyword arguments to the reader’s .options() method.

In [None]:
# Read a csv file and set the headers
df = (spark.read
      .options(header="true")
      .csv("/home/repl/workspace/mnt/data_lake/landing/ratings.csv"))

df.show()

## Defining a schema

In the last exercise, you read a CSV file using PySpark. Because you didn’t define a schema, all column values were parsed as strings which can be cumbersome and inefficient to process. You are usually better off defining the data types in a schema yourself.

To do this, you use classes from the pyspark.sql.types module. Its StructType() class expects a list of StructField() instances that allow you to add fields to a schema. Various other types, such as ByteType() and IntegerType() are also defined in this module and can be used to specify the data types of each field. In this exercise, all of these classes have been imported for you.

In the ratings.csv dataset from the previous exercise, the rating values in the columns “absorption_rate” and “comfort” are expressed on a scale from 1 to 5, like with Amazon’s web store. Because of that, they easily fit into a ByteType(), which can hold values between -128 and 127. The other columns are better left as StringType()s.

Feel free to explore the previous Spark DataFrame in the console using df.show() so you can map each column to the correct type.

### Instructions
    - Define the schema for the spreadsheet that has the columns “brand”, “model”, “absorption_rate” and “comfort”, in that order.
    - Pass the predefined schema while loading the CSV file using the .schema() method.

In [None]:
# Define the schema
schema = StructType([
  StructField("brand", StringType(), nullable=False),
  StructField("model", StringType(), nullable=False),
  StructField("absorption_rate", ByteType(), nullable=True),
  StructField("comfort", ByteType(), nullable=True)
])

better_df = (spark
             .read
             .options(header="true")
             # Pass the predefined schema to the Reader
             .schema(schema)
             .csv("/home/repl/workspace/mnt/data_lake/landing/ratings.csv"))
pprint(better_df.dtypes)

## Removing invalid rows

Data scientists spend a lot of their time cleaning data, because most data sources they work with are not ready for analytics. Hence, the second step in the data transformation pipeline is cleaning the data.

In the previous exercise you have dealt with incorrect data types. In this exercise, you will use Spark to clean a DataFrame, as it contains invalid rows, a common problem with real data.

You’ve seen in the videos how to clean landing/prices_with_invalid_rows.csv. Now do the same for landing/ratings_with_invalid_rows.csv, step by step.

A SparkSession named spark has already been loaded for you.

### Instructions
    - Remove any invalid rows by passing the correct keyword (and associated value) to the reader’s .options() method.

In [None]:
# Specify the option to drop invalid rows
ratings = (spark
           .read
           .options(header=True, mode="DROPMALFORMED")
           .csv("/home/repl/workspace/mnt/data_lake/landing/ratings_with_invalid_rows.csv"))
ratings.show()

## Filling unknown data

What if almost all fields in a row are valid, except one or two. What do you do in this case? Sometimes, these fields are not critical and can be left empty, sometimes they can be given a default value. Let’s try to pass a default value.

You’ve seen in the videos how to clean landing/prices_with_incomplete_rows.csv. Now let’s do the same for landing/ratings_with_incomplete_rows.csv, step by step.

A SparkSession named spark has already been loaded for you, as well as the DataFrame ratings.

### Instructions
    - Fill the incomplete rows, by supplying the default numeric value of 4 for the comfort column.

In [None]:
print("BEFORE")
ratings.show()

print("AFTER")
# Replace nulls with arbitrary value on column subset
ratings = ratings.fillna(4, subset=["comfort"])
ratings.show()

## Conditionally replacing values

Another common situation is that you have values that you want to replace or that don’t make any sense as we saw in the video. You can select the column to be transformed by using the .withColumn() method, conditionally replace those values using the pyspark.sql.functions.when function when values meet a given condition or leave them unaltered when they don’t with the .otherwise() method.

In this exercise, you will do just that by transforming the “comfort” column of the ratings DataFrame (already loaded) by replacing its numeric values with the string "sufficient" when the comfort value is bigger than 3, and "insufficient" when it is not.

### Instructions
    - Use the .withColumn() method to relabel the column named “comfort”.
    - Use the when() function to replace values of the “comfort” column larger than 3 with the string "sufficient".
    - Use the .otherwise() method to replace remaining values with "insufficient".

In [None]:
from pyspark.sql.functions import col, when

# Add/relabel the column
categorized_ratings = ratings.withColumn(
    "comfort",
    # Express the condition in terms of column operations
    when(col("comfort") > 3, "sufficient").otherwise("insufficient"))

categorized_ratings.show()

## Selecting and renaming columns

Transformations are, after ingestion, the next step in data engineering pipelines. Data gets transformed, because certain insights need to be derived. Clear column names help in achieving that goal.

You’ve seen in the videos how to select and rename columns of the landing/prices.csv file. Now do the same for landing/ratings.csv, step by step.

A SparkSession named spark has already been loaded for you and the CSV file was read in a DataFrame called ratings.

### Instructions
    - Select the columns “brand”, “model” and “absorption_rate” from the ratings DataFrame.
    - Rename the “absorption_rate” column to “absorbency”.
    - Show only the distinct values of the resulting DataFrame.

In [None]:
from pyspark.sql.functions import col

# Select the columns and rename the "absorption_rate" column
result = ratings.select([col("brand"),
                       col("model"),
                       col("absorption_rate").alias("absorbency")])

# Show only unique values
result.distinct().show()

## Grouping and aggregating data

Often you will want to compute a metric over a set of values that share a common characteristic, like the average price of a house in a certain region. To achieve this, you would need to group the data by region and compute an aggregate metric on that subgroup of data.

We’ve already seen in the video a couple of these aggregation metrics, on landingprices.csv_. We’ll inspect a few more now and apply them to _~/workspace/mnt/data_lake/landing/purchased.csv_. In particular, you’ll use the spark.sql aggregation functions avg() to compute the average value of some column in a group, stddev_samp() to compute the standard (sample) deviation and max() (which we alias as sfmax so as not to shadow Python’s built-in max()) to retrieve the largest value of some column in a group.

### Instructions
    - Use the .groupBy() method to group the data by the “Country” column.
    - In these groups, compute the average of the “Salary” column and name the resulting column “average_salary”.
    - Compute the standard deviation of the “Salary” column in each group in the same aggregation.
    - Retrieve the largest “Salary” in each group, in the same aggregation, and name the resulting column “highest_salary”.

In [None]:
from pyspark.sql.functions import col, avg, stddev_samp, max as sfmax

aggregated = (purchased
              # Group rows by 'Country'
              .groupBy(col('Country'))
              .agg(
                # Calculate the average salary per group and rename
                avg('Salary').alias('average_salary'),
                # Calculate the standard deviation per group
                stddev_samp('Salary'),
                # Retain the highest salary per group and rename
                sfmax('Salary').alias('highest_salary')
              )
             )

aggregated.show()

## Creating a deployable artifact

In the video, you saw how to run a PySpark program locally by first zipping your code. This packaging step becomes more important when your code consists of many modules. Packaging in the zip format is done by navigating to the root folder of your pipeline using the cd command and running the following command:

```bash
zip --recurse-paths zip_file.zip pipeline_folder
```

In this exercise you will first create a zipped archive, to pass along as dependencies of your Spark job. Then you have one of the prerequisites to running a Spark job locally, which you know now is a good way to get simple issues resolved quickly.

### Ide Exercise Instruction
    - In the shell, navigate to /home/repl/workspace/spark_pipelines/.
    - There, create the pydiaper.zip archive using the bash command zip and its options. The compressed archive should contain all the files in the pydiaper folder (including that folder, because that’s the package imported by various modules), and those in all of the directories below.

- You need to run these commands in the terminal:

```bash
$ cd spark_pipelines
$ zip --recurse-paths pydiaper.zip pydiaper
```

# Submitting your Spark job

With the dependencies of a job ready to be distributed across a cluster’s nodes, you can submit a job to a cluster easily. In this exercise, you'll be testing the job locally.

To run a PySpark application locally, you need to call:

```bash
spark-submit --py-files PY_FILES MAIN_PYTHON_FILE
```

with PY_FILES being either a zipped archive, a Python egg or separate Python files that will be placed on the PYTHONPATH environment variable of your cluster's nodes. The MAIN_PYTHON_FILE should be the entry point of your application.

In this particular exercise, the path of the zipped archive is spark_pipelines/pydiaper/pydiaper.zip whereas the path to your application entry point is spark_pipelines/pydiaper/pydiaper/cleaning/clean_ratings.py.

### Ide Exercise Instruction
    - Submit the Spark job to clean the user ratings.

- You need to run this command in the terminal

```bash
$ spark-submit --py-files spark_pipelines/pydiaper/pydiaper.zip ./spark_pipelines/pydiaper/pydiaper/cleaning/clean_ratings.py
```