In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
# creates and instantiates a pyspark.sql.session.SparkSession object

In [2]:
from pyspark.sql import functions as F

In [3]:
from jyquickhelper import add_notebook_menu
from progressbar import ProgressBar

In [4]:
add_notebook_menu()

# Build dataset

The purpose of this notebook is to build a unique dataframe based on the various files `lieux_yyyy.csv`, `vehicules_yyyy.csv`, `caracteristiques_yyyy.csv` and `vehicules_yyyy.csv` (with `yyyy` ranging from 2005 to 2016) available on <data.gouv.fr>. 

## Put data on the cluster 

Although we are actually working locally, we are aware that when working on a cluster, having the data on the notebook server is not sufficient to access them from the cluster. Prior to any distributed computation, it is thus necessary to put our data on the cluster's storage system which is managed by the Hadoop Distributed File System (HDFS).

Next we report a code snippet that would produce the code to do so.

```python
years = list(range(2005,2016))
fileId = ['vehicules','lieux','usagers','caracteristiques']
for file in fileId:
    for year in years:
        path1 = './Projets/dataSample/'+ file + '_' + str(year) +'.csv'
        path2 = './dataSample/'+ file + '_' + str(year) +'.csv'
        print('!hadoop fs -put', path1, path2)
```  

## Unique dataset

**Step 1**: We build 4 dataframes using the different csv files and store them in a dictionary `result`.

In [5]:
years = list(range(2005,2017))
fileId = ['vehicules','lieux','usagers','caracteristiques']
result = {}
pbar = ProgressBar()

for file in pbar(fileId):
    if file in result.keys():
        #avoid to stack if we already compute the loop
        print(file+' already exists')
    else:
        for year in years:
            path = './dataSample/'+ file + '_' + str(year) +'.csv'
            # print(path)
            if file not in result.keys():
                df_new = spark.read.csv(path, header=True, sep=",", inferSchema=True)
                result[file] = df_new
            else:
                df_new = spark.read.csv(path, header=True, sep=",", inferSchema=True)
                result[file] = result[file].union(df_new)

100% (4 of 4) |##########################| Elapsed Time: 0:00:34 Time:  0:00:34


**Check 1** : Before going any further, we display the first few rows of each dataframe, together with their schemas and basic statistics (number of rows and columns).

In [6]:
for key in result.keys():
    print("{0} has {1} variables and {2} rows".\
          format(key, len(result[key].columns),result[key].count() ))
    result[key].show(5)
    result[key].printSchema()

vehicules has 9 variables and 1433389 rows
+------------+----+----+------+---+----+----+----+-------+
|     Num_Acc|senc|catv|occutc|obs|obsm|choc|manv|num_veh|
+------------+----+----+------+---+----+----+----+-------+
|200500000001|   0|   7|     0|  0|   2|   1|   1|    A01|
|200500000001|   0|   7|     0|  0|   2|   8|  10|    B02|
|200500000002|   0|   7|     0|  0|   2|   7|  16|    A01|
|200500000002|   0|   2|     0|  0|   2|   1|   1|    B02|
|200500000003|   0|   2|     0|  0|   2|   1|   1|    A01|
+------------+----+----+------+---+----+----+----+-------+
only showing top 5 rows

root
 |-- Num_Acc: long (nullable = true)
 |-- senc: integer (nullable = true)
 |-- catv: integer (nullable = true)
 |-- occutc: integer (nullable = true)
 |-- obs: integer (nullable = true)
 |-- obsm: integer (nullable = true)
 |-- choc: integer (nullable = true)
 |-- manv: integer (nullable = true)
 |-- num_veh: string (nullable = true)

lieux has 18 variables and 839985 rows
+------------+----+-

**Step 2** : We join the 4 dataframes and store them in `df`.


In [47]:
df_lieux = result['lieux']
df_carac = result['caracteristiques']
df_usagers = result['usagers']
df_vehicules = result['vehicules']

to_join = [df_carac,df_usagers ,df_vehicules]

#joining and avoid duplicate
df= df_lieux
key = 'Num_Acc'
for df_right in to_join:
    common_colname = [col for col in df_right.columns if col in df.columns ]
    common_colname.remove(key)    
    if common_colname:
        for duplicate in common_colname:
            df_right = df_right.withColumnRenamed(duplicate, duplicate +'_')
    df = df.join(df_right , on=key, how='inner')

**Check 2** : we check that all years are actually in the data dataframe `df`.

In [48]:
df.groupBy('an').count().sort("an", ascending=True).show()

+---+------+
| an| count|
+---+------+
|  5|374561|
|  6|362507|
|  7|356228|
|  8|322196|
|  9|311706|
| 10|288112|
| 11|281675|
| 12|263194|
| 13|242163|
| 14|248642|
| 15|245706|
| 16|257286|
+---+------+



## Reduced dataset

For the sake of simplicity, we keep only variables of interest for data exploration and prediction of the level of seriousness of accidents (`grav`). The interested reader should refer to the online documentation for more information on each of these variables.

In [51]:
col_keep = ["grav", "Num_Acc", "num_veh", "mois", "an", "hrmn", "lum", "dep",
          "com", "agg", "int", "atm", "col", "catr", "circ", 
          "nbv", "prof", "plan", "larrout", "surf", "infra", 
          "situ", "catv", "obs", "obsm", "choc", "manv", "place",
         "catu", "sexe", "An_nais", "secu"]

df_keep = df.select([F.col(c) for c in col_keep])

**Save** : We save `df_keep` which will be the base unit of future investigations. We also save a `df_vehicules`

In [66]:
df_keep.\
repartition(1).\
write.format("com.databricks.spark.csv").\
option("header","true").\
save("df_keep")

In [11]:
df_vehicules.\
repartition(1).\
write.format("com.databricks.spark.csv").\
option("header","true").\
save("df_vehicules")

Rearranging files.

In [67]:
ls df_keep/

part-00000-c2354000-3b69-4a88-9666-0cacf6a6bfeb-c000.csv  _SUCCESS


In [14]:
ls df_vehicules/

part-00000-39bf1862-fc34-4df5-bfa8-5611593cea3c-c000.csv  _SUCCESS


In [68]:
mv df_keep/part-00000-c2354000-3b69-4a88-9666-0cacf6a6bfeb-c000.csv dataSample/df_keep.csv
mv df_vehicules/part-00000-39bf1862-fc34-4df5-bfa8-5611593cea3c-c000.csv dataSample/df_vehicules.csv

In [75]:
!rm -rf df_keep/
!rm -rf df_vehicules

**Output** : 
- dataSample/df_keep.csv - contains only relevant variables for analysis and `grav` prediction
- dataSample/df_vehicules.csv - used in data exploration