<a href="https://colab.research.google.com/github/MateoJacomeUPC/BDM/blob/master/BDM_Project_Parquet.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Note: this code will need to be updated to work with Hadoop instead of Google Drive.

Read a partitioned dataset from filesystem
- https://arrow.apache.org/docs/python/filesystems.html#filesystem

Example:
```
import pyarrow.dataset as ds
ds.dataset("data/", filesystem=fs)
```



In [2]:
pip install "dask[complete]"



In [3]:
import os
from datetime import datetime
import pandas as pd
import dask.dataframe as dd
import numpy as np
from pyarrow import fs
import pyarrow.parquet as pq
import pyarrow.dataset as ds
import pyarrow as pa

We are going to import the 11 .csv files of opendatabcn-income and write them to a single parquet table.

Reading Partitioned CSV Data into a single table with pyarrow
- https://arrow.apache.org/cookbook/py/io.html#reading-partitioned-data
- https://arrow.apache.org/docs/python/generated/pyarrow.dataset.dataset.html#pyarrow.dataset.dataset
- https://arrow.apache.org/docs/python/generated/pyarrow.dataset.Dataset.html#pyarrow.dataset.Dataset.to_table

In [4]:
# returns table for processing to parquet for persistent storage
# returns file list so that files can be deleted after processing
def LoadPartitionedCSV(directory, folder, schema=None):
  """ 
  Input: a string for the data directory path,  
  a string of the folder name that contains partitioned data in csv format,
  a schema for the data if known, otherwise it will be inferred.
  Output: pyarrow table, list of loaded files
  """
  path = directory + "/" + folder
  # loading all csv files in path to a pyarrow dataset
  dataset = ds.dataset(path, schema=schema, format="csv")
  # loading pyarrow datset to a single pyarrow table
  table = dataset.to_table()
  return table, dataset.files

We want to define the schema to make sure datatypes are efficient.
- https://arrow.apache.org/cookbook/py/schema.html#setting-the-schema-of-a-table
- https://arrow.apache.org/docs/python/api/datatypes.html

In [5]:
pa_schema = pa.schema([
    ("Any", pa.uint16()),
    ("Codi_Districte", pa.uint8()),
    ("Nom_Districte", pa.string()),
    ("Codi_Barri", pa.uint8()),
    ("Nom_Barri", pa.string()),
    ("Població", pa.uint32()),
    ("Índex RFD Barcelona = 100", pa.float64()) # should be float but there is messy data
])

The import fails because there is a character "-" in the .csv which is a string when it should be a float.

In [6]:
directory = "/content/drive/MyDrive/BDM-Project/Data"
#table, files = LoadPartitionedCSV(directory, "opendatabcn-income", pa_schema)

In the schema, we will define the index to be a string instead of a float so that the import succeeds.

In [7]:
pa_schema = pa.schema([
    ("Any", pa.uint16()),
    ("Codi_Districte", pa.uint8()),
    ("Nom_Districte", pa.string()),
    ("Codi_Barri", pa.uint8()),
    ("Nom_Barri", pa.string()),
    ("Població", pa.uint32()),
    ("Índex RFD Barcelona = 100", pa.string()) # should be float but there is messy data
])

In [8]:
directory = "/content/drive/MyDrive/BDM-Project/Data"
table, files = LoadPartitionedCSV(directory, "opendatabcn-income", pa_schema)

In [9]:
print (files)

['/content/drive/MyDrive/BDM-Project/Data/opendatabcn-income/2007_Distribucio_territorial_renda_familiar.csv', '/content/drive/MyDrive/BDM-Project/Data/opendatabcn-income/2008_Distribucio_territorial_renda_familiar.csv', '/content/drive/MyDrive/BDM-Project/Data/opendatabcn-income/2009_Distribucio_territorial_renda_familiar.csv', '/content/drive/MyDrive/BDM-Project/Data/opendatabcn-income/2010_Distribucio_territorial_renda_familiar.csv', '/content/drive/MyDrive/BDM-Project/Data/opendatabcn-income/2011_Distribucio_territorial_renda_familiar.csv', '/content/drive/MyDrive/BDM-Project/Data/opendatabcn-income/2012_Distribucio_territorial_renda_familiar.csv', '/content/drive/MyDrive/BDM-Project/Data/opendatabcn-income/2013_Distribucio_territorial_renda_familiar.csv', '/content/drive/MyDrive/BDM-Project/Data/opendatabcn-income/2014_Distribucio_territorial_renda_familiar.csv', '/content/drive/MyDrive/BDM-Project/Data/opendatabcn-income/2015_Distribucio_territorial_renda_familiar.csv', '/content

In [10]:
print (table)

pyarrow.Table
Any: uint16
Codi_Districte: uint8
Nom_Districte: string
Codi_Barri: uint8
Nom_Barri: string
Població: uint32
Índex RFD Barcelona = 100: string
----
Any: [[2007,2007,2007,2007,2007,2007,2007,2007,2007,2007,...,2007,2007,2007,2007,2007,2007,2007,2007,2007,2007],[2008,2008,2008,2008,2008,2008,2008,2008,2008,2008,...,2008,2008,2008,2008,2008,2008,2008,2008,2008,2008],[2009,2009,2009,2009,2009,2009,2009,2009,2009,2009,...,2009,2009,2009,2009,2009,2009,2009,2009,2009,2009],[2010,2010,2010,2010,2010,2010,2010,2010,2010,2010,...,2010,2010,2010,2010,2010,2010,2010,2010,2010,2010],[2011,2011,2011,2011,2011,2011,2011,2011,2011,2011,...,2011,2011,2011,2011,2011,2011,2011,2011,2011,2011],[2012,2012,2012,2012,2012,2012,2012,2012,2012,2012,...,2012,2012,2012,2012,2012,2012,2012,2012,2012,2012],[2013,2013,2013,2013,2013,2013,2013,2013,2013,2013,...,2013,2013,2013,2013,2013,2013,2013,2013,2013,2013],[2014,2014,2014,2014,2014,2014,2014,2014,2014,2014,...,2014,2014,2014,2014,2014,2014,2014,

If no schema is provided, pyarrow will infer the datatypes, but they will be less efficient.

In [11]:
table_noSchema, files = LoadPartitionedCSV(directory, "opendatabcn-income")

In [12]:
print(table_noSchema)

pyarrow.Table
Any: int64
Codi_Districte: int64
Nom_Districte: string
Codi_Barri: int64
Nom_Barri: string
Població: int64
Índex RFD Barcelona = 100: string
----
Any: [[2007,2007,2007,2007,2007,2007,2007,2007,2007,2007,...,2007,2007,2007,2007,2007,2007,2007,2007,2007,2007],[2008,2008,2008,2008,2008,2008,2008,2008,2008,2008,...,2008,2008,2008,2008,2008,2008,2008,2008,2008,2008],[2009,2009,2009,2009,2009,2009,2009,2009,2009,2009,...,2009,2009,2009,2009,2009,2009,2009,2009,2009,2009],[2010,2010,2010,2010,2010,2010,2010,2010,2010,2010,...,2010,2010,2010,2010,2010,2010,2010,2010,2010,2010],[2011,2011,2011,2011,2011,2011,2011,2011,2011,2011,...,2011,2011,2011,2011,2011,2011,2011,2011,2011,2011],[2012,2012,2012,2012,2012,2012,2012,2012,2012,2012,...,2012,2012,2012,2012,2012,2012,2012,2012,2012,2012],[2013,2013,2013,2013,2013,2013,2013,2013,2013,2013,...,2013,2013,2013,2013,2013,2013,2013,2013,2013,2013],[2014,2014,2014,2014,2014,2014,2014,2014,2014,2014,...,2014,2014,2014,2014,2014,2014,2014,20

Using Dask with Remote Data
- https://docs.dask.org/en/latest/how-to/connect-to-remote-data.html
- https://docs.dask.org/en/latest/generated/dask.dataframe.read_csv.html#dask.dataframe.read_csv

In [13]:
# returns dataframe for processing to parquet for persistent storage
# needs to return file list so that files can be deleted after processing?
def DaskLoadPartitionedCSV(directory, folder):
  """ 
  Input: a string for the data directory path,  
  a string of the folder name that contains partitioned data in csv format
  Output: dask dataframe, list of loaded files
  """
  path = directory + "/" + folder + '/*.csv'
  # loading all csv files in path to a single dask dataframe with block size 128MB
  df = dd.read_csv(path, include_path_column='sourceFile', blocksize='64MB')
  return df

In [14]:
directory = "/content/drive/MyDrive/BDM-Project/Data"
df = DaskLoadPartitionedCSV(directory, "opendatabcn-income")

In [15]:
df.columns

Index(['Any', 'Codi_Districte', 'Nom_Districte', 'Codi_Barri', 'Nom_Barri',
       'Població', 'Índex RFD Barcelona = 100', 'sourceFile'],
      dtype='object')

In [16]:
df.info

<bound method DataFrame.info of Dask DataFrame Structure:
                  Any Codi_Districte Nom_Districte Codi_Barri Nom_Barri Població Índex RFD Barcelona = 100       sourceFile
npartitions=11                                                                                                             
                int64          int64        object      int64    object    int64                    object  category[known]
                  ...            ...           ...        ...       ...      ...                       ...              ...
...               ...            ...           ...        ...       ...      ...                       ...              ...
                  ...            ...           ...        ...       ...      ...                       ...              ...
                  ...            ...           ...        ...       ...      ...                       ...              ...
Dask Name: from-delayed, 33 tasks>

In [17]:
df.info

<bound method DataFrame.info of Dask DataFrame Structure:
                  Any Codi_Districte Nom_Districte Codi_Barri Nom_Barri Població Índex RFD Barcelona = 100       sourceFile
npartitions=11                                                                                                             
                int64          int64        object      int64    object    int64                    object  category[known]
                  ...            ...           ...        ...       ...      ...                       ...              ...
...               ...            ...           ...        ...       ...      ...                       ...              ...
                  ...            ...           ...        ...       ...      ...                       ...              ...
                  ...            ...           ...        ...       ...      ...                       ...              ...
Dask Name: from-delayed, 33 tasks>

Setting Datatypes in Dask
- https://docs.dask.org/en/latest/generated/dask.dataframe.DataFrame.astype.html

In [18]:
# mixed datatype columns must be converted using dd.to_numeric()
schema = {
    'Any':'uint16',
    'Codi_Districte':'uint8',
    'Nom_Districte': "string[pyarrow]",
    'Codi_Barri':'uint8',
    'Nom_Barri': "string[pyarrow]",
    'Població':'uint32',
    'Índex RFD Barcelona = 100': "string[pyarrow]",
    'sourceFile': "string[pyarrow]" 
}

In [19]:
df = df.astype(schema)

Converting dtypes to numeric using errors='coerce'
- https://docs.dask.org/en/latest/generated/dask.dataframe.to_numeric.html

In [20]:
df['Índex RFD Barcelona = 100']= dd.to_numeric(df['Índex RFD Barcelona = 100'], errors='coerce')

In [21]:
df.compute().shape

(811, 8)

In [22]:
df.dtypes

Any                          uint16
Codi_Districte                uint8
Nom_Districte                string
Codi_Barri                    uint8
Nom_Barri                    string
Població                     uint32
Índex RFD Barcelona = 100     int64
sourceFile                   string
dtype: object

The numeric conversion worked correctly. For some reason, the dtype is showing int64 instead of float, but the decimal values are retained in the 'Índex RFD Barcelona = 100' column. The 8 values which were '-' have been replaced with null values.

In [23]:
df.head(10)

Unnamed: 0,Any,Codi_Districte,Nom_Districte,Codi_Barri,Nom_Barri,Població,Índex RFD Barcelona = 100,sourceFile
0,2007,1,Ciutat Vella,1,el Raval,46595,64.7,/content/drive/MyDrive/BDM-Project/Data/openda...
1,2007,1,Ciutat Vella,2,el Barri Gòtic,27946,86.5,/content/drive/MyDrive/BDM-Project/Data/openda...
2,2007,1,Ciutat Vella,3,la Barceloneta,15921,66.7,/content/drive/MyDrive/BDM-Project/Data/openda...
3,2007,1,Ciutat Vella,4,"Sant Pere, Santa Caterina i la Ribera",22572,80.2,/content/drive/MyDrive/BDM-Project/Data/openda...
4,2007,2,Eixample,5,el Fort Pienc,31521,107.9,/content/drive/MyDrive/BDM-Project/Data/openda...
5,2007,2,Eixample,6,la Sagrada Família,52185,101.8,/content/drive/MyDrive/BDM-Project/Data/openda...
6,2007,2,Eixample,7,la Dreta de l'Eixample,42504,137.6,/content/drive/MyDrive/BDM-Project/Data/openda...
7,2007,2,Eixample,8,l'Antiga Esquerra de l'Eixample,41413,126.5,/content/drive/MyDrive/BDM-Project/Data/openda...
8,2007,2,Eixample,9,la Nova Esquerra de l'Eixample,58146,116.9,/content/drive/MyDrive/BDM-Project/Data/openda...
9,2007,2,Eixample,10,Sant Antoni,37988,103.8,/content/drive/MyDrive/BDM-Project/Data/openda...


In [24]:
df.compute().describe(include='all')

Unnamed: 0,Any,Codi_Districte,Nom_Districte,Codi_Barri,Nom_Barri,Població,Índex RFD Barcelona = 100,sourceFile
count,811.0,811.0,811,811.0,811,811.0,803.0,811
unique,,,11,,74,,,11
top,,,Nou Barris,,el Raval,,,/content/drive/MyDrive/BDM-Project/Data/openda...
freq,,,143,,11,,,74
mean,2011.993835,7.161529,,37.611591,,21943.408138,92.588917,
std,3.158951,9.583283,,21.857582,,14618.144852,40.084215,
min,2007.0,1.0,,1.0,,1.0,34.3,
25%,2009.0,4.0,,19.0,,10355.5,67.5,
50%,2012.0,7.0,,37.0,,19732.0,82.5,
75%,2015.0,8.0,,56.0,,30288.0,103.8,


I do not want my output file to have 11 partitions, so I will re-partition the file to have 1.

Repartitioning a dask dataframe
- https://docs.dask.org/en/latest/generated/dask.dataframe.DataFrame.repartition.html

In [25]:
df = df.repartition(npartitions=1)

Saving as Parquet in Dask
- https://docs.dask.org/en/latest/generated/dask.dataframe.to_parquet.html

In [26]:
pa_schema = pa.schema([
    ("Any", pa.uint16()),
    ("Codi_Districte", pa.uint8()),
    ("Nom_Districte", pa.string()),
    ("Codi_Barri", pa.uint8()),
    ("Nom_Barri", pa.string()),
    ("Població", pa.uint32()),
    ("Índex RFD Barcelona = 100", pa.float64())
])

In [29]:
# name_function = lambda x: f"openBcnIncomeData-{x}.parquet"
# df.to_parquet using pyarrow did not work with name_function
df.to_parquet(directory, engine='pyarrow', compression='snappy',
              write_index=False, 
              append=False, ignore_divisions=False, 
              partition_on=None, storage_options=None, 
              write_metadata_file=True, compute=True, 
              schema=pa_schema)


In [33]:
# converts Dask df to Pandas df and loads to pyarrow table
table = pa.Table.from_pandas(df.compute(), schema=pa_schema)

In [34]:
table.schema

Any: uint16
Codi_Districte: uint8
Nom_Districte: string
Codi_Barri: uint8
Nom_Barri: string
Població: uint32
Índex RFD Barcelona = 100: double
-- schema metadata --
pandas: '{"index_columns": [], "column_indexes": [{"name": null, "field_n' + 1070