# TD2 – A Hands-On Exploration of NoSQL Columnar Storage Using Apache Parquet


In [2]:
import pyarrow as pa
import pandas as pd
import pyarrow.parquet as pq
import datetime as dt

### Create an PyArrow schema 

In [None]:
weather_schema = pa.schema([
    ('city', pa.string()),
    ('measurement_time', pa.timestamp('ms')),
    ('temperature', pa.float32()),
    ('atmospheric_pressure', pa.float32())
])


city: string
measurement_time: timestamp[ms]
temperature: float
atmospheric_pressure: float

### Create a column data as PyArrow Arrays

In [7]:
# Create PyArrow arrays for weather data
cities = pa.array(['New York', 'London', 'Tokyo'], type=pa.string())
measurement_times = pa.array([
    dt.datetime(2022, 5, 1, 12, 0, 0),
    dt.datetime(2022, 5, 1, 13, 0, 0),
    dt.datetime(2022, 5, 1, 14, 0, 0)
], type=pa.timestamp('ms'))

temperatures = pa.array([20.5, 15.2, 23.1], type=pa.float32())
pressures = pa.array([101.5, 99.2, 100.1], type=pa.float32())

### Create a PyArrow RecordBatch 

In [10]:
batch = pa.RecordBatch.from_arrays(
    [cities, measurement_times, temperatures, pressures],
    names=weather_schema.names
)

batch

pyarrow.RecordBatch
city: string
measurement_time: timestamp[ms]
temperature: float
atmospheric_pressure: float
----
city: ["New York","London","Tokyo"]
measurement_time: [2022-05-01 12:00:00.000,2022-05-01 13:00:00.000,2022-05-01 14:00:00.000]
temperature: [20.5,15.2,23.1]
atmospheric_pressure: [101.5,99.2,100.1]

### Convert the RecordBatch into a PyArrow Table

In [12]:
table = pa.Table.from_batches([batch])

table

pyarrow.Table
city: string
measurement_time: timestamp[ms]
temperature: float
atmospheric_pressure: float
----
city: [["New York","London","Tokyo"]]
measurement_time: [[2022-05-01 12:00:00.000,2022-05-01 13:00:00.000,2022-05-01 14:00:00.000]]
temperature: [[20.5,15.2,23.1]]
atmospheric_pressure: [[101.5,99.2,100.1]]

### Save the table in Parquet Format

In [13]:
pq.write_table(table, 'weather.parquet')

### Read columns from the Parquet file 

In [15]:
ctemp = pq.read_table('weather.parquet', columns=['city', 'temperature'])

ctemp

pyarrow.Table
city: string
temperature: float
----
city: [["New York","London","Tokyo"]]
temperature: [[20.5,15.2,23.1]]

In [16]:
df_ctemp = ctemp.to_pandas()
print(df_ctemp)

       city  temperature
0  New York         20.5
1    London         15.2
2     Tokyo         23.1


### Import vente-produit-2016.csv file 


In [23]:
df = pd.read_csv('vente-produit-2016virgule.csv')
print(df.shape)

(81524, 6)


In [25]:
df.head()

Unnamed: 0,annee,departement,amm,quantite_produit,conditionnement,eaj
0,2016,AIN,1030003,4.3,L,Oui
1,2016,AIN,2000003,6064.4,L,Non
2,2016,AIN,2000017,9.6,L,Oui
3,2016,AIN,2000018,12740.4,K,Non
4,2016,AIN,2000044,594.0,L,Non


### Compression of Parquet files 

In [26]:
# Convert your Pandas DataFrame to an Arrow Table
table_vente = pa.Table.from_pandas(df)

# 1) Snappy (default)
pq.write_table(table_vente, 'vente_snappy.parquet')

# 2) GZIP
pq.write_table(table_vente, 'vente_gzip.parquet', compression='gzip')

# 3) Brotli
pq.write_table(table_vente, 'vente_brotli.parquet', compression='brotli')

# 4) ZSTD
pq.write_table(table_vente, 'vente_zstd.parquet', compression='zstd')

# 5) LZ4
pq.write_table(table_vente, 'vente_lz4.parquet', compression='lz4')

# 6) No compression
pq.write_table(table_vente, 'vente_none.parquet', compression='none')

### Check the size for each generated compression type

In [27]:
import os

# List of Parquet file names
parquet_files = [
    'vente_snappy.parquet',
    'vente_gzip.parquet',
    'vente_brotli.parquet',
    'vente_zstd.parquet',
    'vente_lz4.parquet',
    'vente_none.parquet'
]

for file_name in parquet_files:
    # Get the file size in bytes
    file_size_bytes = os.path.getsize(file_name)
    # Convert bytes to kilobytes (KB) and megabytes (MB)
    file_size_kb = file_size_bytes / 1024
    file_size_mb = file_size_bytes / (1024 * 1024)

    print(f"{file_name}:")
    print(f"  - {file_size_bytes} bytes")
    print(f"  - {file_size_kb:.2f} KB")
    print(f"  - {file_size_mb:.2f} MB\n")


vente_snappy.parquet:
  - 401972 bytes
  - 392.55 KB
  - 0.38 MB

vente_gzip.parquet:
  - 299743 bytes
  - 292.72 KB
  - 0.29 MB

vente_brotli.parquet:
  - 288537 bytes
  - 281.77 KB
  - 0.28 MB

vente_zstd.parquet:
  - 318895 bytes
  - 311.42 KB
  - 0.30 MB

vente_lz4.parquet:
  - 372186 bytes
  - 363.46 KB
  - 0.35 MB

vente_none.parquet:
  - 467782 bytes
  - 456.82 KB
  - 0.45 MB



### Partitioning of data

In [28]:
pq.write_to_dataset(
    table_vente,
    root_path='partitions',         # Folder where partitions will be created
    partition_cols=['annee', 'departement']  # The columns to partition by
)