# Download Data

### Test
```
mkdir bronze
cd bronze
mkdir v1.0-test_meta
cd v1.0-trainval_meta
wget https://d36yt3mvayqw5m.cloudfront.net/public/v1.0/v1.0-test_meta.tgz
gunzip v1.0-test_meta.tgz
tar -xf v1.0-test_meta.tar
rm v1.0-test_meta.tar
```

### Trainval
```
mkdir bronze
cd bronze
mkdir v1.0-trainval_meta
cd v1.0-trainval_meta
wget https://d36yt3mvayqw5m.cloudfront.net/public/v1.0/v1.0-trainval_meta.tgz
gunzip v1.0-trainval_meta.tgz
tar -xf v1.0-trainval_meta.tar
rm v1.0-trainval_meta.tar
```

# Import Libraries

In [1]:
import pyspark
from pyspark.sql import types
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.conf import SparkConf
from pyspark.context import SparkContext

# Create SparkSession

In [2]:
conf = SparkConf() \
    .setMaster('local[*]') \
    .setAppName('test') \
    .set("spark.sql.legacy.parquet.int96RebaseModeInRead", "CORRECTED") \
    .set("spark.sql.legacy.parquet.int96RebaseModeInWrite", "CORRECTED") \
    .set("spark.sql.legacy.parquet.datetimeRebaseModeInRead", "CORRECTED") \
    .set("spark.sql.legacy.parquet.datetimeRebaseModeInWrite", "CORRECTED") \
    .set("spark.driver.memory", "16g")

sc = SparkContext(conf=conf)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


24/04/23 01:02:47 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
spark = SparkSession.builder \
    .config(conf=sc.getConf()) \
    .getOrCreate()

# Available files to read

In [4]:
import glob

In [5]:
file_list_trainval = glob.glob('bronze/v1.0-trainval_meta/v1.0-trainval/*')
file_list_test = glob.glob('bronze/v1.0-test_meta/v1.0-test/*')

In [6]:
file_list_trainval

['bronze/v1.0-trainval_meta/v1.0-trainval/sample_annotation.json',
 'bronze/v1.0-trainval_meta/v1.0-trainval/map.json',
 'bronze/v1.0-trainval_meta/v1.0-trainval/attribute.json',
 'bronze/v1.0-trainval_meta/v1.0-trainval/sample.json',
 'bronze/v1.0-trainval_meta/v1.0-trainval/visibility.json',
 'bronze/v1.0-trainval_meta/v1.0-trainval/ego_pose.json',
 'bronze/v1.0-trainval_meta/v1.0-trainval/instance.json',
 'bronze/v1.0-trainval_meta/v1.0-trainval/sensor.json',
 'bronze/v1.0-trainval_meta/v1.0-trainval/log.json',
 'bronze/v1.0-trainval_meta/v1.0-trainval/sample_data.json',
 'bronze/v1.0-trainval_meta/v1.0-trainval/category.json',
 'bronze/v1.0-trainval_meta/v1.0-trainval/scene.json',
 'bronze/v1.0-trainval_meta/v1.0-trainval/calibrated_sensor.json']

In [7]:
file_list_test

['bronze/v1.0-test_meta/v1.0-test/sample_annotation.json',
 'bronze/v1.0-test_meta/v1.0-test/map.json',
 'bronze/v1.0-test_meta/v1.0-test/attribute.json',
 'bronze/v1.0-test_meta/v1.0-test/sample.json',
 'bronze/v1.0-test_meta/v1.0-test/visibility.json',
 'bronze/v1.0-test_meta/v1.0-test/ego_pose.json',
 'bronze/v1.0-test_meta/v1.0-test/instance.json',
 'bronze/v1.0-test_meta/v1.0-test/sensor.json',
 'bronze/v1.0-test_meta/v1.0-test/log.json',
 'bronze/v1.0-test_meta/v1.0-test/sample_data.json',
 'bronze/v1.0-test_meta/v1.0-test/category.json',
 'bronze/v1.0-test_meta/v1.0-test/scene.json',
 'bronze/v1.0-test_meta/v1.0-test/calibrated_sensor.json']

# How to read JSON files with PySpark

In [8]:
# df_category = spark.read.option("multiline", True).json("bronze\\v1.0-test_meta\\v1.0-test\\category.json")

In [9]:
# df_category.show()

In [10]:
# df_visibility = spark.read.option("multiline", True).json("bronze\\v1.0-test_meta\\v1.0-test\\visibility.json")

# Create schema with PySpark

## Schemas

How to create a schema:
```python
from pyspark.sql import types

schema = types.StructType([
    types.StructField('hour', types.TimestampType(), True),
    types.StructField('zone', types.IntegerType(), True),
    types.StructField('revenue', types.DoubleType(), True),
    types.StructField('count', types.IntegerType(), True)
])
```

In [11]:
schema_log = types.StructType([
    types.StructField('token', types.StringType(), True),
    types.StructField('logfile', types.StringType(), True),
    types.StructField('vehicle', types.StringType(), True),
    types.StructField('date_captured', types.DateType(), True),
    types.StructField('location', types.StringType(), True)
])

In [12]:
schema_scene = types.StructType([
    types.StructField('token', types.StringType(), True),
    types.StructField('log_token', types.StringType(), True),
    types.StructField('nbr_samples', types.IntegerType(), True),
    types.StructField('first_sample_token', types.StringType(), True),
    types.StructField('last_sample_token', types.StringType(), True),
    types.StructField('name', types.StringType(), True),
    types.StructField('description', types.StringType(), True)
])

In [13]:
schema_sample = types.StructType([
    types.StructField('token', types.StringType(), True),
    types.StructField('timestamp', types.TimestampType(), True),
    types.StructField('prev', types.StringType(), True),
    types.StructField('next', types.StringType(), True),
    types.StructField('scene_token', types.StringType(), True)
])

In [14]:
schema_sample_data = types.StructType([
    types.StructField('token', types.StringType(), True),
    types.StructField('sample_token', types.StringType(), True),
    types.StructField('ego_pose_token', types.StringType(), True),
    types.StructField('calibrated_sensor_token', types.StringType(), True),
    types.StructField('timestamp', types.TimestampType(), True),
    types.StructField('fileformat', types.StringType(), True),
    types.StructField('is_key_frame', types.BooleanType(), True),
    types.StructField('height', types.IntegerType(), True),
    types.StructField('width', types.IntegerType(), True),
    types.StructField('filename', types.StringType(), True),
    types.StructField('prev', types.StringType(), True),
    types.StructField('next', types.StringType(), True)
])

In [15]:
schema_sample_annotation = types.StructType([
    types.StructField('token', types.StringType(), True),
    types.StructField('sample_token', types.StringType(), True),
    types.StructField('instance_token', types.StringType(), True),
    types.StructField('visibility_token', types.DecimalType(), True),
    types.StructField('attribute_tokens', types.StringType(), True),
    types.StructField('translation', types.StringType(), True),
    types.StructField('size', types.StringType(), True),
    types.StructField('rotation', types.StringType(), True),
    types.StructField('prev', types.StringType(), True),
    types.StructField('next', types.StringType(), True),
    types.StructField('num_lidar_pts', types.IntegerType(), True),
    types.StructField('num_radar_pts', types.IntegerType(), True)
])

In [16]:
schema_instance = types.StructType([
    types.StructField('token', types.StringType(), True),
    types.StructField('category_token', types.StringType(), True),
    types.StructField('nbr_annotations', types.IntegerType(), True),
    types.StructField('first_annotation_token', types.StringType(), True),
    types.StructField('last_annotation_token', types.StringType(), True)
])

## Read JSON files with defined Schema

### Trainval

In [17]:
file_list_trainval

['bronze/v1.0-trainval_meta/v1.0-trainval/sample_annotation.json',
 'bronze/v1.0-trainval_meta/v1.0-trainval/map.json',
 'bronze/v1.0-trainval_meta/v1.0-trainval/attribute.json',
 'bronze/v1.0-trainval_meta/v1.0-trainval/sample.json',
 'bronze/v1.0-trainval_meta/v1.0-trainval/visibility.json',
 'bronze/v1.0-trainval_meta/v1.0-trainval/ego_pose.json',
 'bronze/v1.0-trainval_meta/v1.0-trainval/instance.json',
 'bronze/v1.0-trainval_meta/v1.0-trainval/sensor.json',
 'bronze/v1.0-trainval_meta/v1.0-trainval/log.json',
 'bronze/v1.0-trainval_meta/v1.0-trainval/sample_data.json',
 'bronze/v1.0-trainval_meta/v1.0-trainval/category.json',
 'bronze/v1.0-trainval_meta/v1.0-trainval/scene.json',
 'bronze/v1.0-trainval_meta/v1.0-trainval/calibrated_sensor.json']

In [18]:
df_log_trainval = spark.read.option("multiline", True).schema(schema_log).json('bronze/v1.0-trainval_meta/v1.0-trainval/log.json')
df_scene_trainval = spark.read.option("multiline", True).schema(schema_scene).json('bronze/v1.0-trainval_meta/v1.0-trainval/scene.json')
df_sample_trainval = spark.read.option("multiline", True).schema(schema_sample).json('bronze/v1.0-trainval_meta/v1.0-trainval/sample.json')
df_sample_data_trainval = spark.read.option("multiline", True).schema(schema_sample_data).json('bronze/v1.0-trainval_meta/v1.0-trainval/sample_data.json')
df_sample_annotation_trainval = spark.read.option("multiline", True).schema(schema_sample_annotation).json('bronze/v1.0-trainval_meta/v1.0-trainval/sample_annotation.json')
df_instance_trainval = spark.read.option("multiline", True).schema(schema_instance).json('bronze/v1.0-trainval_meta/v1.0-trainval/instance.json')

24/04/23 01:02:48 WARN SQLConf: The SQL config 'spark.sql.legacy.parquet.int96RebaseModeInWrite' has been deprecated in Spark v3.2 and may be removed in the future. Use 'spark.sql.parquet.int96RebaseModeInWrite' instead.
24/04/23 01:02:48 WARN SQLConf: The SQL config 'spark.sql.legacy.parquet.datetimeRebaseModeInRead' has been deprecated in Spark v3.2 and may be removed in the future. Use 'spark.sql.parquet.datetimeRebaseModeInRead' instead.
24/04/23 01:02:48 WARN SQLConf: The SQL config 'spark.sql.legacy.parquet.int96RebaseModeInRead' has been deprecated in Spark v3.2 and may be removed in the future. Use 'spark.sql.parquet.int96RebaseModeInRead' instead.
24/04/23 01:02:48 WARN SQLConf: The SQL config 'spark.sql.legacy.parquet.datetimeRebaseModeInWrite' has been deprecated in Spark v3.2 and may be removed in the future. Use 'spark.sql.parquet.datetimeRebaseModeInWrite' instead.
24/04/23 01:02:48 WARN SQLConf: The SQL config 'spark.sql.legacy.parquet.int96RebaseModeInWrite' has been de

### Test

In [19]:
file_list_test

['bronze/v1.0-test_meta/v1.0-test/sample_annotation.json',
 'bronze/v1.0-test_meta/v1.0-test/map.json',
 'bronze/v1.0-test_meta/v1.0-test/attribute.json',
 'bronze/v1.0-test_meta/v1.0-test/sample.json',
 'bronze/v1.0-test_meta/v1.0-test/visibility.json',
 'bronze/v1.0-test_meta/v1.0-test/ego_pose.json',
 'bronze/v1.0-test_meta/v1.0-test/instance.json',
 'bronze/v1.0-test_meta/v1.0-test/sensor.json',
 'bronze/v1.0-test_meta/v1.0-test/log.json',
 'bronze/v1.0-test_meta/v1.0-test/sample_data.json',
 'bronze/v1.0-test_meta/v1.0-test/category.json',
 'bronze/v1.0-test_meta/v1.0-test/scene.json',
 'bronze/v1.0-test_meta/v1.0-test/calibrated_sensor.json']

In [20]:
df_log_test = spark.read.option("multiline", True).schema(schema_log).json('bronze/v1.0-test_meta/v1.0-test/log.json')
df_scene_test = spark.read.option("multiline", True).schema(schema_scene).json('bronze/v1.0-test_meta/v1.0-test/scene.json')
df_sample_test = spark.read.option("multiline", True).schema(schema_sample).json('bronze/v1.0-test_meta/v1.0-test/sample.json')
df_sample_data_test = spark.read.option("multiline", True).schema(schema_sample_data).json('bronze/v1.0-test_meta/v1.0-test/sample_data.json')
df_sample_annotation_test = spark.read.option("multiline", True).schema(schema_sample_annotation).json('bronze/v1.0-test_meta/v1.0-test/sample_annotation.json')
df_instance_test = spark.read.option("multiline", True).schema(schema_instance).json('bronze/v1.0-test_meta/v1.0-test/instance.json')

In [21]:
df_log = df_log_trainval.union(df_log_test)
df_scene = df_scene_trainval.union(df_scene_test)
df_sample = df_sample_trainval.union(df_sample_test)
df_sample_data = df_sample_data_trainval.union(df_sample_data_test)
df_sample_annotation = df_sample_annotation_trainval.union(df_sample_annotation_test)
df_instance = df_instance_trainval.union(df_instance_test)

# Clean Data with PySpark - Silver

In [22]:
df_log = df_log.select('token', 'logfile', 'vehicle','location')

In [23]:
df_scene = df_scene.select('token','log_token','name','description').withColumnRenamed('name', 'scene_name')

In [24]:
df_sample = df_sample.select('token', 'timestamp','scene_token').withColumnRenamed('timestamp', 'timestamp_sample')

In [25]:
df_sample_data = df_sample_data.select('sample_token','ego_pose_token','timestamp','is_key_frame','filename').withColumnRenamed('timestamp', 'timestamp_sample_data')

In [26]:
df_sample_annotation = df_sample_annotation.select('sample_token','instance_token','visibility_token','attribute_tokens')

In [27]:
df_instance = df_instance.select('token','category_token','nbr_annotations')

## Store as Parquet(CSV for test)

In [28]:
# df_log.write.mode('overwrite').option("header",True).csv("silver/sources/log/log.csv")
df_log.write.partitionBy("vehicle").mode('overwrite').parquet("silver/sources/log/")

                                                                                

In [29]:
# df_scene.write.mode('overwrite').option("header",True).csv("silver/sources/scene/scene.csv")
df_scene.coalesce(10).write.mode('overwrite').parquet("silver/sources/scene/")

In [30]:
# df_sample.write.mode('overwrite').option("header",True).csv("silver/sources/sample/sample.csv")
df_sample.write.mode('overwrite').parquet("silver/sources/sample/")

In [31]:
# df_sample_data.write.mode('overwrite').option("header",True).csv("silver/sources/sample_data/sample_data.csv")
df_sample_data.coalesce(10).write.mode('overwrite').parquet("silver/sources/sample_data/")

                                                                                

In [32]:
# df_sample_annotation.write.mode('overwrite').option("header",True).csv("silver/sources/sample_data_annotation/sample_data_annotation.csv")
df_sample_annotation.write.mode('overwrite').parquet("silver/sources/sample_annotation/")

                                                                                

In [33]:
# df_instance.write.mode('overwrite').option("header",True).csv("silver/sources/instance/instance.csv")
df_instance.write.partitionBy("category_token").mode('overwrite').parquet("silver/sources/instance/")

                                                                                

# Clean Data with PySpark - Gold

## Samples

In [34]:
df_samples_join_1 = df_sample_data.filter(df_sample_data.is_key_frame == True) \
            .join(df_sample, df_sample_data.sample_token == df_sample.token, 'inner') \
            .drop('token')

In [35]:
df_samples_join_2 = df_samples_join_1.join(df_scene, df_samples_join_1.scene_token == df_scene.token, 'inner') \
                .drop('token')

In [36]:
df_samples_join_3 = df_samples_join_2.join(df_log, df_samples_join_2.log_token == df_log.token, 'inner') \
                .drop('token')

In [37]:
samples = df_samples_join_3.select('sample_token',
                                  'logfile',
                                  'vehicle', 
                                  'location', 
                                  'scene_name', 
                                  'description',
                                  'timestamp_sample_data',
                                  'ego_pose_token')

In [38]:
samples.printSchema()

root
 |-- sample_token: string (nullable = true)
 |-- logfile: string (nullable = true)
 |-- vehicle: string (nullable = true)
 |-- location: string (nullable = true)
 |-- scene_name: string (nullable = true)
 |-- description: string (nullable = true)
 |-- timestamp_sample_data: timestamp (nullable = true)
 |-- ego_pose_token: string (nullable = true)



In [39]:
samples.show(2,truncate=False, vertical=True)

[Stage 9:>                                                          (0 + 1) / 1]

-RECORD 0--------------------------------------------------------------
 sample_token          | e93e98b63d3b40209056d129dc53ceee              
 logfile               | n015-2018-07-18-11-07-57+0800                 
 vehicle               | n015                                          
 location              | singapore-onenorth                            
 scene_name            | scene-0001                                    
 description           | Construction, maneuver between several trucks 
 timestamp_sample_data | +27440-07-13 09:34:45.215872                  
 ego_pose_token        | bddd80ae33ec4e32b27fdb3c1160a30e              
-RECORD 1--------------------------------------------------------------
 sample_token          | 14d5adfe50bb4445bc3aa5fe607691a8              
 logfile               | n015-2018-07-18-11-07-57+0800                 
 vehicle               | n015                                          
 location              | singapore-onenorth                     

                                                                                

In [40]:
samples.write.partitionBy("vehicle").mode('overwrite').parquet("gold/samples/")

                                                                                

## Objects

In [41]:
df_category = spark.read.parquet("silver/seeds/category.parquet")
df_visibility = spark.read.parquet("silver/seeds/visibility.parquet")

In [42]:
df_objects_join_1 = df_sample_annotation.join(df_instance, df_sample_annotation.instance_token == df_instance.token,'inner') \
                    .drop('token', 'instance_token')

In [43]:
df_objects_join_2 = df_objects_join_1.join(df_category, df_objects_join_1.category_token == df_category.uid, 'inner') \
                    .withColumnRenamed('description', 'category_description') \
                    .drop('uid', 'category_token')

In [44]:
objects = df_objects_join_2.join(df_visibility, df_objects_join_2.visibility_token == df_visibility.uid, 'inner') \
                    .drop('uid', 'visibility_token', 'description', 'attribute_tokens')

In [45]:
objects.write.partitionBy("object").mode('overwrite').parquet("gold/objects/")

                                                                                

In [47]:
spark.stop()