01 Prepare Spark environment

In [None]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.4.0.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m4.1 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.4.0-py2.py3-none-any.whl size=311317130 sha256=f19645a4a3e9bae4fd45a107c1e94184d902e2a00d38a0c22747861a7c694897
  Stored in directory: /root/.cache/pip/wheels/7b/1b/4b/3363a1d04368e7ff0d408e57ff57966fcdf00583774e761327
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.4.0


In [None]:
from pyspark.sql import SparkSession
from pyspark import SparkContext

spark = SparkSession.builder.master("local").getOrCreate()
sc = SparkContext.getOrCreate()

In [None]:
rdd = sc.parallelize(["Hello Spark"])

# Test whether pyspark is successfully installed
counts = rdd.flatMap(lambda line: line.split(" ")) \
    .map(lambda word: (word, 1)) \
    .reduceByKey(lambda a, b: a + b) \
    .collect()
print(counts)

[('Hello', 1), ('Spark', 1)]


02 Connect to dataset in Google Drive, load and show the attached datafile 

In [None]:
# mount Google Drive
from google.colab import drive
drive.mount('/content/drive')

!ls "/content/drive/My Drive/Colab Notebooks"

Mounted at /content/drive
'Copy of Drive FUSE example.ipynb'
 position-des-bus-en-circulation-sur-le-reseau-star-en-temps-reel.csv
 Pyspark01.ipynb


In [None]:
# Reading csv dataset 
df = spark.read.csv('/content/drive/My Drive/Colab Notebooks/\
position-des-bus-en-circulation-sur-le-reseau-star-en-temps-reel.csv', header=True, sep=';', inferSchema=True)

# show the dataframe
df.show()
df.columns

+----------+------------+------------+----------+-----------------+------------+--------------------+-------------------+---------------+
|  Bus (ID)|Bus (numéro)|        Etat|Ligne (ID)|Ligne (nom court)|Code du sens|         Destination|        Coordonnées|Avance / Retard|
+----------+------------+------------+----------+-----------------+------------+--------------------+-------------------+---------------+
| 149722672|   149722672|    En ligne|         5|               C5|           1|              Patton|48.126706,-1.665295|           1218|
| 146720212|   146720212|Hors-service|      null|             null|        null|                null|48.110314,-1.642594|           null|
|1205787936|  1205787936|Hors-service|      null|             null|        null|                null|48.110638,-1.642596|           null|
| 100682492|   100682492|    En ligne|        34|               34|           0|           Chantepie|48.099402,-1.620724|            -54|
|1203586132|  1203586132|Hors-serv

['Bus (ID)',
 'Bus (numéro)',
 'Etat',
 'Ligne (ID)',
 'Ligne (nom court)',
 'Code du sens',
 'Destination',
 'Coordonnées',
 'Avance / Retard']

03 Clean the dataset and export a csv

In [16]:
# importing python reduce
from functools import reduce

def clean_csv(df, new_columns,path_csv):
    """clean dataframe into CSV file.

    Keyword arguments:
    new_columns -- the list of renamed columns
    """
    # updating column name
    data = reduce(lambda df, idx: df.withColumnRenamed(df.columns[idx], new_columns[idx]), range(len(df.columns)), df)
    data.printSchema()
    
    # output with CSV options : comma
    data.write.options(header='True', delimiter=',') \
        .csv(path_csv)
    return data

# TODO: translate into English
new_columns =["Bus (ID)","Bus (number)","Status","Line (ID)","Line (short name)","Direction code","Destination","Coordinates","Advance / Delay"]

new_data = clean_csv(df,new_columns,"/content/drive/My Drive/Colab Notebooks/cleaned-real-time-bus")

!ls "/content/drive/My Drive/Colab Notebooks/cleaned-real-time-bus"


root
 |-- Bus (ID): integer (nullable = true)
 |-- Bus (number): integer (nullable = true)
 |-- Status: string (nullable = true)
 |-- Line (ID): integer (nullable = true)
 |-- Line (short name): string (nullable = true)
 |-- Direction code: integer (nullable = true)
 |-- Destination: string (nullable = true)
 |-- Coordinates: string (nullable = true)
 |-- Advance / Delay: integer (nullable = true)

part-00000-ff1db904-c324-42bc-ac9a-aeeeae987ee7-c000.csv  _SUCCESS
