# Introduction
This notebook downloads the data for the [Safety](https://www.aiforsea.com/safety) challenge and stores it in dbfs as parquet files.

The feature csv files are consolildated into one parquet file.

# Libraries

In [3]:
from pathlib import Path
import pyspark.sql.types as types

# Download

Download the data from the given url:

In [6]:
%sh
wget https://s3-ap-southeast-1.amazonaws.com/grab-aiforsea-dataset/safety.zip -P /tmp/ --quiet
unzip /tmp/safety.zip -d /tmp 
rm /tmp/safety.zip

# Consolidate

Rename the files into human readable form and consolidate the feature files into one file:

In [9]:
%sh
find /tmp/safety -type f | sort | grep -v DS_Store

In [10]:
for i, file in enumerate(Path('/tmp/safety').glob('features/*.csv')):
  _, file_part, *_ = file.name.split('-')
  file_part = int(file_part)
  file_dir = file.parent
  new_name = f'feature_{file_part}.csv'
  
  file.rename(file_dir / new_name)

In [11]:
%sh
mv /tmp/safety/labels/part-00000-e9445087-aa0a-433b-a7f6-7f4c19d78ad6-c000.csv /tmp/safety/labels/label.csv

In [12]:
%sh
find /tmp/safety -type f | sort | grep -v DS_Store

Move the files to dbfs:

In [14]:
for file in Path('/tmp/safety').glob('**/*.csv'):
  dbutils.fs.mv(f'file:{file}', 'dbfs:/msh/grab/data')

In [15]:
%fs
ls /msh/grab/data

path,name,size
dbfs:/msh/grab/data/feature_0.csv,feature_0.csv,198217280
dbfs:/msh/grab/data/feature_1.csv,feature_1.csv,198277557
dbfs:/msh/grab/data/feature_2.csv,feature_2.csv,198235087
dbfs:/msh/grab/data/feature_3.csv,feature_3.csv,198237335
dbfs:/msh/grab/data/feature_4.csv,feature_4.csv,198184081
dbfs:/msh/grab/data/feature_5.csv,feature_5.csv,198228652
dbfs:/msh/grab/data/feature_6.csv,feature_6.csv,198259229
dbfs:/msh/grab/data/feature_7.csv,feature_7.csv,198200323
dbfs:/msh/grab/data/feature_8.csv,feature_8.csv,198280753
dbfs:/msh/grab/data/feature_9.csv,feature_9.csv,198193367


Load the files into a dataframe:

In [17]:
label_schema = types.StructType([
  types.StructField('bookingID', types.StringType(), True),
  types.StructField('label', types.IntegerType(), True)
])

feature_schema = types.StructType([
  types.StructField('bookingID', types.StringType(), True),
  types.StructField('Accuracy', types.DoubleType(), True),
  types.StructField('Bearing', types.DoubleType(), True),
  types.StructField('acceleration_x', types.DoubleType(), True),
  types.StructField('acceleration_y', types.DoubleType(), True),
  types.StructField('acceleration_z', types.DoubleType(), True),
  types.StructField('gyro_x', types.DoubleType(), True),
  types.StructField('gyro_y', types.DoubleType(), True),
  types.StructField('gyro_z', types.DoubleType(), True),
  types.StructField('second', types.DoubleType(), True),
  types.StructField('Speed', types.DoubleType(), True)
])

In [18]:
label_df = spark\
  .read\
  .format('csv')\
  .option('header', 'true')\
  .load('dbfs:/msh/grab/data/label.csv', schema=label_schema)

feature_df = spark\
  .read\
  .format('csv')\
  .option('header', 'true')\
  .load('dbfs:/msh/grab/data/feature_*.csv', schema=feature_schema)

In [19]:
label_df.dtypes, feature_df.dtypes

Save the dataframes:

In [21]:
label_df.write.mode('overwrite').parquet('dbfs:/msh/grab/data/label.parquet')
feature_df.write.mode('overwrite').parquet('dbfs:/msh/grab/data/features.parquet')

In [22]:
%fs
ls msh/grab/data

path,name,size
dbfs:/msh/grab/data/feature_0.csv,feature_0.csv,198217280
dbfs:/msh/grab/data/feature_1.csv,feature_1.csv,198277557
dbfs:/msh/grab/data/feature_2.csv,feature_2.csv,198235087
dbfs:/msh/grab/data/feature_3.csv,feature_3.csv,198237335
dbfs:/msh/grab/data/feature_4.csv,feature_4.csv,198184081
dbfs:/msh/grab/data/feature_5.csv,feature_5.csv,198228652
dbfs:/msh/grab/data/feature_6.csv,feature_6.csv,198259229
dbfs:/msh/grab/data/feature_7.csv,feature_7.csv,198200323
dbfs:/msh/grab/data/feature_8.csv,feature_8.csv,198280753
dbfs:/msh/grab/data/feature_9.csv,feature_9.csv,198193367
