# Merge CSV Files Demo

The following PySpark job will merge all CSV files, de-deuplicate the results, followed by writing the merged file to a new location.

>NOTE: All of the the data in the example was generated with Python Fake and Pandas. None of it is
>real people / data

## Script Actions

The script below performs the following tasks

* Reads all *.csv files in a folder locaiton
* Merges' all files, then de-duplicates the final DataFrame
* Writes the consolidated CSV to a new location

```shell
# Sample Folder Locations
Input Dir   : Dev/Data/demo/{file0.csv,file1.csv,file2.csv}
Output Dir  : Dev/Data/demo/merged/master.csv
```

>NOTE: Rather than listing files individually, you could use wildcards to pull
>all files from a directory `/path-to-csv-files/*.csv`


## CSV File Contents

Three files were used to test the merge containing First, Last, and Birtdata Birthdate columns.

* file0.csv has 10 unique entries
* file1.csv has 10 unique entries plue (5) lines in file0.csv
* file2.csv has 10 unique entries plue (10) lines in file1.csv

The funciton below should combine the CSV files, and produce a master.csv without duplicates.


## File Schema

If the output format were `Parquet`, a schema should be used so it accompany's the partitions. As the output is
CSV, no schema information is required as CSV files to not save schema data. However, using a schema can aid in changing columns names from then original CSV or add them is the original CSV does not contain a header-row.

>NOTE: Python is `Not` Type-Safe. When using Python as a main data-processors, you should add
>schemas where possible. Java and Scala are Type-Safe languages but it is still advised to use schemas
>when you can.

```python
# Define schema ( Data-Types )
csvSchema = StructType([ \
            StructField("First", StringType(), True), \
            StructField("Last", StringType(), True), \
            StructField("Birthdate", StringType(), True)])
```

## Azure Blob Storage Access

To use blob storage on Azure, its merely a matter of mounting the path in the Spark Job, then setting 
the input / output locations you desire.

For a full examples, see : [Run a Databricks notebook with the Databricks Notebook Activity in Azure Data Factory](https://docs.microsoft.com/en-us/azure/data-factory/transform-data-using-databricks-notebook)

The Azure link above demostrates how to:

* Create a data factory.
* Create a pipeline that uses Databricks Notebook Activity.
* Trigger a pipeline run.
* Monitor the pipeline run.

```python
# Mount the remote location
dbutils.fs.mount(
  source = "wasbs://<container-name>@<storage-account-name>.blob.core.windows.net",
  mount_point = "/mnt/<mount-name>",
  extra_configs = {"<conf-key>":dbutils.secrets.get(scope = "<scope-name>", key = "<key-name>")})

# Set the path in your script
df = spark.read.csv("/mnt/<mount-name>/...")
df = spark.read.csv("dbfs:/<mount-name>/...")
```

## Databricks Workbook

The following script is what would be saved to a [Databricks Notebook](https://docs.databricks.com/notebooks/index.html) then run directly, or via [Azure Datafactory Pipline](https://docs.microsoft.com/en-us/azure/data-factory/transform-data-using-databricks-notebook).

In [1]:
import os
from os.path import expanduser

import findspark
findspark.init()

from pyspark import SparkConf, SparkContext

from pyspark.sql import SparkSession, SQLContext
from pyspark.sql import functions as f
from pyspark.sql.types import *

In [2]:
# In-File Location ( Azure Data Lake or AWS s3://<file-location-bucket> )
home_dir = expanduser("~")
inFiles = os.path.join(home_dir, 'Dev/Data/demo/*.csv')
mergeDir = os.path.join(home_dir, 'Dev/Data/demo/merged/')

# Define schema ( Data-Types )
csvSchema = StructType([ \
            StructField("First", StringType(), True), \
            StructField("Last", StringType(), True), \
            StructField("Birthdate", StringType(), True)])

# Setup the Spark Cluster Config Variables
conf = SparkConf().setAppName("Project-1 Merge CSV Files").setMaster("local[*]")

# Instantiate the Spark Session
spark = SparkSession \
    .builder \
    .config(conf=conf) \
    .getOrCreate()

# Create the DataSet, Read and De-Duplicatate
df = spark.read.format("csv") \
    .option("header", "True") \
    .option("sep",",") \
    .schema(csvSchema) \
    .load(inFiles) \
    .dropDuplicates()

# Show the contents of merged files
df.orderBy('First', 'Last', ascending=True).show(40)

# Write the master CSV file
df.toPandas().to_csv(os.path.join(mergeDir, 'project1.csv'), header=True, sep=',', index=False)

+--------+----------+----------+
|   First|      Last| Birthdate|
+--------+----------+----------+
| Anatole|    Brekke|1978-07-05|
|   Arman|    Hudson|1959-06-23|
|   Aubra|   Pollich|2001-03-13|
|   Belen|    Klocko|1962-01-24|
|   Bobby|     Hilll|1989-08-24|
| Cherrie|    Nienow|1955-04-14|
|Chrystal|    Zemlak|1999-01-04|
|   Cliff|    Hamill|1966-01-30|
| Danniel|   Shields|1961-10-19|
| Delaney|     Morar|2007-10-21|
|    Evia|   Waelchi|1953-04-13|
|  Gladis|   Collier|1982-10-11|
|  Gustav|     Bauch|1974-02-06|
|  Harlie|     Borer|1965-03-30|
|  Hobart|   Keebler|1968-09-25|
|Humphrey|     Thiel|1949-05-04|
|   Jenna|    Feeney|1966-02-16|
|    Jere|     Pagac|1944-04-04|
|  Justen|  Champlin|1994-10-02|
|   Kenji|    Harvey|2007-03-27|
|   Mario|Bartoletti|1981-03-07|
|   Mario|     Nader|1993-05-02|
|  Neppie|     Runte|1949-05-31|
|    Olof|   McGlynn|1982-10-07|
|   Ruthe|     Emard|1958-10-14|
|   Selah|  Reynolds|1988-06-16|
|  Tawnya|    Dooley|1951-12-26|
|   Telly|

In [3]:
# Shutdown the PySpark engine.
spark.stop()