<a href="https://colab.research.google.com/github/austinAbraham/hello-world/blob/master/pyspark_test.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Mango DE PySpark Test

This exercise is designed to test a candidates ability to translate requirements into a very small ETL script using PySpark.

The first set of cells in this notebook install java and spark in the environment, a repo with a data generating function, and start a spark session. One common issue with using these setup instructions is slow response from the mirror used to download spark, so an alternative mirror is provided (commented out in code cell). However, you may need to shop around to find a mirror that is more appropriate for your location. Candidates are expected to be able to debug installation/setup issues themselves.

Instructions for the test can be found below the setup cells.

If you are new to Google colab, note that there is a directory browser button on the left. As you run through the setup cells and generate data for the test, files will be added here. The home directory for this workspace is /content and you can mount this to a personal Drive to persist files that are added here if you wish. For more information on colab, see the tutorial here: https://colab.research.google.com/notebooks/welcome.ipynb


In [None]:
# Install Java and Spark
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

!wget -qN https://apache.mirrors.nublue.co.uk/spark/spark-3.0.3/spark-3.0.3-bin-hadoop2.7.tgz
#!wget -qN https://mirrors.gethosted.online/apache/spark/spark-3.0.3/spark-3.0.3-bin-hadoop2.7.tgz

!tar xf spark-3.0.3-bin-hadoop2.7.tgz

In [None]:
# If having issues then uncomment the below line and run. Then re-run the previous two cells
# !rm -rf spark-2.4.7-bin-hadoop2.7*

In [None]:
!git clone https://github.com/MangoTheCat/de_tech_test_pyspark.git functions > /dev/null 2>&1

In [None]:
!pip install -q findspark

In [None]:
# Configure Java / Spark environment variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.3-bin-hadoop2.7"

In [None]:
# start a spark session to check that this is all working
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

## Scenario overview

An organisation recieves files, usually daily but with some gaps, which are saved to ~/data/main. The task is to do some basic cleaning and transformation of these files and to write them to a separate dataset. Imagine that the output you produce here will be registered as an external table in a Hive database, which an analysis team will query using a SQL tool.

Files are generated in the first code cell below. Running this cell will produce a new file with random data for today's date. It will throw an error if a file already exists for today's date. Each file will be relatively small due to the restrictions of this environment, but treat these files as though they are large.

Detailed instructions of the required transformations are expressed below. Feel free to use cells in this notebook while developing your solution, but you should develop a solution that can sit in a separate .py file. You should also include unit tests as appropriate. Remember that, if you have not mounted the drive of your colab runtime, any external files you create here will not persist. Please add a text cell describing any assumptions you have made about how your script should be run.

In [None]:
from functions.generate_data import generate_data
generate_data()


## Requirements

1. Some values in field1 are missing. We know the value columns for these records are faulty so we should remove these rows entirely.

2. Field2 also has some missing data, but we can work out what those values should be. If field3 is between 8 and 12, these missing values should be equal to 1. Otherwise, they should be 2.

3. We need to add an extra dimension based on fields 1 and 2. A lookup csv has been provided in the data directory. You should add the lookup_val field to the dataset as a column named 'new_dimension'.

4. If either field1 or field2 have a value of 10, the new_dimension column should have a value of 'XX'.

5. The analysis team will often run queries over a specific year or a specific year/month. Take this into consideration when designing the output table structure.

6. For analysis purposes, the team won't need field3. Please remove this field and aggregate on the remaining fields. All the value columns should be summed.

7. A distinct count of the values in field3 should be included in the output.

8. The analysts would also like a column added showing the proportion of val3 for each value of field4 over the total of val3 for the remaining dimension columns.

9. It should be rare, but we may sometimes receive a replacement file for an old period (which will overwrite the old file in the data/main directory). We need to be able to use the transformation code on an adhoc basis to update that data in our output. The cluster we are working on is very busy, so we don't want this to take longer than it has to.

10. Write your output to the home directory in a format and structure that you deem suitable.

In [84]:
import pandas as pd
import numpy as np
from pyspark.sql.functions import expr, when, col, collect_list, lit
from pyspark.sql import Window
from pyspark.sql.types import DecimalType
from datetime import datetime
from pathlib import Path




"""
read_data(data_date)

parameters : 
  - data_date, date string expected in yyyy-mm-dd format

If data_date is not null the current date will be used to read the etxt file,
otherwise the data file from the specified date will be read.

The data frame will be returned

"""
def read_data(data_date):
  # set data_date
  date=data_date
  if data_date == "":
    date = datetime.today().strftime('%Y-%m-%d')
    print(date)
  #  reading data
  data = spark.read.csv('./data/main/' + date + '/test_data.csv', header=True)

  # returning dataframe
  return data



"""
processing(df)

parameters : 
  - df, data frame to apply transfirmations

Dataframe passed in processed with multiple tranformation performed,
  - row with a. null in field1 column is removed
  - field2 column value set as 1 if field3 value between 8 and 12, inclusive 
    of the 8 and 12. Other wise set as 2.
  - columns field1, field2, field3, field4 cast as integer type
  - columns val1,val2,val3 cast as double type   

 The function returns the processed dataframe 

"""
def processing(df):
  # drop rows with nulls in field1
  # set value for null in field2
  df1 = df.dropna(how='any', subset='field1')\
    .withColumn('field2', when(col('field2')== 'null', \
                                  when(((col('field3') < 12) & (col('field3') > 8)), 1)\
                               .otherwise(2)).otherwise(col('field2')))
 
  # cast field1, field2, field3, field4 to int
  # cast val1, val2, val3 to double
  df2 = df1\
    .withColumn('field1', col("field1").cast("int"))\
    .withColumn('field2', col("field2").cast("int"))\
    .withColumn('field3', col("field3").cast("int"))\
    .withColumn('field4', col("field4").cast("int"))\
    .withColumn('val1', col("val1").cast('double'))\
    .withColumn('val2', col("val2").cast('double'))\
    .withColumn('val3', col("val3").cast('double'))  

  # return df
  return df2



"""
data_joining(df, df_l)

parameters : 
  - df, main data frame to join
  - df_l, lookup data frame

  - Dataframes df and df_l are joined uner conditions field1 == f1 and field2 == f2
  - Column lookup_val is renamed 'new_dimension' 

 The function returns the joined dataframe 

"""
def data_joining(df, df_l):

  #  renaming field1 in look-up to f1 for ease
  df_l = df_l.withColumnRenamed('field1', 'f1')

  # joining data frames
  # renaming lookup_val to new dimension
  # modiflyin values in new_dimension based on condition
  #  dropping columns f1, f2
  data_joined = df.join(df_l, on=[col('field1')==col('f1'), col('field2')==col('f2')])\
    .withColumnRenamed('lookup_val', 'new_dimension')\
    .withColumn('new_dimension', when(((col('field1') == 10) | (col('field2') == 10)), 'XX')\
                .otherwise(col('new_dimension')))\
    .drop('f1', 'f2')            

  # returning dataframe
  return data_joined    




"""
aggre(df, df_l)

parameters : 
  - df, main data frame to aggregate

  - all value columns are summed
  - Relevance for val3 in each group in field 4 is calculated

 The function returns the joined dataframe 

"""

def aggre(df):

  # summing val1, val2, val3 to val_sum
  # calcualtion of relevance of val3 with field4 as val3_relevance
  # casting val3_relevance as decimal type
  df1 = df\
    .withColumn('val_sum', expr('val1 + val2 + val3'))\
    .withColumn('val3_relevance', expr("val3/(SUM(val3) over (PARTITION BY field4)) "))\
    .withColumn('val3_relevance', col('val3_relevance').cast(DecimalType(11,10)))


  return df1



 
if __name__ == "__main__":

  """  
    # casting system arguments into a string arragy
    args = str(sys.argv)

    args_length = len(args)

    if args_length > 3:
      date_arg = args[3]
    else:
      # assigned empty value to use the day's date for processing
      date_arg = None

    # reading look_up and output file locations
    lookp_up_arg = args[2]      
    output_arg = args[1]
      
  """
  # reading file date to process 
  date_arg = input("Enter date or leave empty: ")

  #  reading output folder path
  output_arg = './data/output'

  # reading look-up data
  data_lookup = spark.read.csv('./data/lookup.csv', header=True)

  # reading data file
  data = read_data(date_arg)

  # processing data file
  data_work_copy = processing(data)

  # joining main data and look-up
  data_joined = data_joining(data_work_copy, data_lookup)

  # data aggregations
  data_agg = aggre(data_joined)

  # setting date output write 
  date = date_arg
  if date_arg == "":
    date = datetime.today().strftime('%Y-%m-%d')



  # adding date column, processed_date column
  data_agg = data_agg.withColumn('date', lit(date))\
  .withColumn('date_processed', lit(datetime.today().strftime('%Y-%m-%d')))\
  .drop('field3')

  # checking if output file exists 
  if Path(output_arg).exists():
    #  reading output file
    data_store = spark.read.csv(output_arg, header=True)

    # creating final dataframe by joining current and main
    data_final = data_store.union(data_agg)

    # writing data to temp location
    data_final.write.csv(path='./data/temp_csv', mode='overwrite', header=True)

    # reading data from temp and overwriting output file
    df_final = spark.read.csv(path='./data/temp', header=True)
    df_final.write.csv(path=output_arg, mode='overwrite', header=True)


    # data_final.show(10)
  else:
      data_agg.write.csv(path=output_arg, mode='overwrite', header=True)



    

 



Enter date or leave empty: 
2021-07-26
