# Test BackMarket - Baptiste Geslin

Data Pipeline Assessment.

You can develop & refactor your code (using your versioning tool) following this pipeline:

1 - Download and read the file: product_catalog.csv locally <br>
2 - Transform the file from CSV to Parquet format locally <br>
3 - Separate the valid rows from the invalid ones into two separate files: the business wants only the product with an image but wants to archive the invalids rows



Yeah, well done!
Now Back Market is growing so fast, what you would do to tackle the massive new CSV files?
Describe the next steps for your code to scale it up.

In [1]:
import pandas as pd
import numpy as np
import pyspark
from pyspark.sql import SparkSession
from pyspark import SparkContext
import databricks.koalas as ks
import unittest
import pyarrow.parquet as pq



## Using PANDAS

Pandas is a python library dedicated to datascientist. It provides high-performance, easy-to-use data structures and data analysis tools that let the user explore, analyse and manipulate datasets.<br>
In our case, product_catalogs.csv's size is about 1Mo so the use of Pandas is appropriate. <br>
For larger dataset, it may be necessary to use a large-scale data processing tool like Spark.

In [2]:
def PipelineWithPandas(path):
    """
    This function imports product_catalog.csv file located in the given path.
    It splits the data into 2 parquet files :
        - valid rows with an image specified
        - invalid rows without image supplied
    This function uses the pandas library.
    """
       
    # import product_catalog.csv
    product_catalog = pd.read_csv(path,sep=",",header=0,  encoding="utf-8")
    
    # separate the data into two dataframes : invalid_rows and valid_rows
    invalid_rows = product_catalog[product_catalog['image'].isnull()]
    valid_rows = product_catalog[product_catalog['image'].notnull()] 
    
    # Il est possible d'ordonner les entrées selon les champs les plus fréquemment utilisés lors des requêtes
    # afin d'améliorer leurs performances.
    # Ici je propose de trier la colonne "category_id" afin de pouvoir requêter de manière efficace les produits d'une même catégorie.
    valid_rows = valid_rows.sort_values(by=['category_id'])
    invalid_rows = invalid_rows.sort_values(by=['category_id'])
    
    # create parquet files locally
    invalid_rows.to_parquet('invalid_rows_pandas.parquet', compression='snappy')
    valid_rows.to_parquet('valid_rows_pandas.parquet', compression='snappy') 

## Using SPARK

Apache Spark is a unified analytics engine for large-scale data processing. 
It's a great way to tackle massive new csv files.

In [3]:
# Define SparkContext and SparkSession
sc = SparkContext.getOrCreate()
ss = SparkSession.builder.getOrCreate()

In [4]:
def PipelineWithSpark(path): 
    """
    This function imports product_catalog.csv file located in the given path.
    It splits the file into 2 parquet files :
        - valid rows with an image specified
        - invalid rows without image supplied
    This function uses the spark engine.
    """
    
    # import product_catalog.csv
    product_catalog = ss.read.load(path, format="csv", sep=",", inferSchema="true", header="true")
    
    # separate the data into two dataframes : invalid_rows and valid_rows
    invalid_rows = product_catalog.where(product_catalog["image"].isNull())
    valid_rows = product_catalog.where(product_catalog["image"].isNotNull())
    
    # create parquet files locally
    invalid_rows.write.mode('overwrite').parquet('invalid_rows_pyspark.parquet')
    valid_rows.write.mode('overwrite').parquet('valid_rows_pyspark.parquet') 

## Using KOALAS

Koalas (Databricks open source project) implements Pandas API on top of Apache Spark. <br>
It brings together Spark's distributed environment and high performance along with Pandas easy to use API. 

In [5]:
def PipelineWithKoalas(path):
    """
    This function imports product_catalog.csv file located in the given path.
    It splits the file into 2 parquet files :
        - valid rows with an image specified
        - invalid rows without image supplied
    This function uses the koalas library.
    """
    
    # import product_catalog.csv   
    product_catalog = ss.read.load(path, format="csv", sep=",", inferSchema="true", header="true").to_koalas()
    
    # separate the data into two dataframes : invalid_rows and valid_rows
    invalid_rows = product_catalog[product_catalog["image"].isna()]
    valid_rows = product_catalog.dropna(subset=['image'])
    
    # create parquet files locally
    invalid_rows.to_parquet('invalid_rows_koalas.parquet', mode = 'overwrite')
    valid_rows.to_parquet('valid_rows_koalas.parquet', mode = 'overwrite') 

## UNITEST

In [6]:
"""
Unit tests for Pipeline functions : 
    - PipelineWithSpark
    - PipelineWithKoalas
    - PipelineWithPandas

"""
class PipelineFunctionTest(unittest.TestCase):
    @classmethod
    def setUpClass(self):
        # Define path
        self.path = "product_catalog.csv"
    
        
    def testPipelineWithSpark(self):
        # Run Spark pipeline
        PipelineWithSpark(self.path)
        
        # Read parquet files with pyarrow
        valid_spark = pq.read_table('valid_rows_pyspark.parquet').to_pandas()
        invalid_spark = pq.read_table('invalid_rows_pyspark.parquet').to_pandas()
        
        # Perform tests
        self.assertFalse(valid_spark["image"].isnull().any())
        self.assertTrue(invalid_spark["image"].isnull().all())
        
        
    def testPipelineWithPandas(self):
        # Run Pandas pipeline
        PipelineWithPandas(self.path)
        
        # Read parquet files with pyarrow
        valid_pandas = pq.read_table('valid_rows_pandas.parquet').to_pandas()
        invalid_pandas = pq.read_table('invalid_rows_pandas.parquet').to_pandas()
        
        # Perform tests
        self.assertFalse(valid_pandas["image"].isnull().any())
        self.assertTrue(invalid_pandas["image"].isnull().all())
        
        
    def testPipelineWithKoalas(self):
        # Run Koalas pipeline
        PipelineWithKoalas(self.path)
        
        # Read parquet files with pyarrow
        valid_koalas = pq.read_table('valid_rows_koalas.parquet').to_pandas()
        invalid_koalas = pq.read_table('invalid_rows_koalas.parquet').to_pandas()
        
        # Perform tests
        self.assertFalse(valid_koalas["image"].isnull().any())
        self.assertTrue(invalid_koalas["image"].isnull().all())

In [7]:
# Execute the tests
tests = unittest.makeSuite(PipelineFunctionTest)
runner = unittest.TextTestRunner(verbosity=2)
runner.run(tests)

testPipelineWithKoalas (__main__.PipelineFunctionTest) ... ok
testPipelineWithPandas (__main__.PipelineFunctionTest) ... ok
testPipelineWithSpark (__main__.PipelineFunctionTest) ... ok

----------------------------------------------------------------------
Ran 3 tests in 18.973s

OK


<unittest.runner.TextTestResult run=3 errors=0 failures=0>