# Real Time Data Streaming - ESME

In [None]:
# Instancier spark
import os
os.environ["SPARK_HOME"] = "/workspaces/real_time_data_streaming/spark-3.2.3-bin-hadoop2.7"
os.environ['PYSPARK_SUBMIT_ARGS'] = '--jars /workspaces/real_time_data_streaming/spark-streaming-kafka-0-10-assembly_2.12-3.2.3.jar pyspark-shell'
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
import findspark
findspark.init()
from pyspark.sql import SparkSession

packages = [
    f'org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.3',
    'org.apache.kafka:kafka-clients:3.2.3'
]
spark = (SparkSession.builder
   .config("spark.jars.packages", ",".join(packages))
   .config("spark.sql.repl.eagerEval.enabled", True)
   .getOrCreate()
)
spark.sparkContext.setLogLevel("WARN")
spark

In [None]:
# Télécharger les données
! wget https://jacobceles.github.io/knowledge_repo/colab_and_pyspark/cars.csv

In [None]:
# Lire les données en dataframe
df = spark.read.csv('cars.csv', header=True, sep=";")
df.show(5)

In [None]:
# Afficher les première lignes
df.show(5, truncate=False)

In [None]:
# Afficher les première lignes
df.limit(5)

In [None]:
# Afficher les noms de colonnes
df.columns

In [None]:
# Afficher le type des colonnes
df.printSchema()

In [None]:
# Définir explicitement les types de données
from pyspark.sql.types import *
df.columns

labels = [
     ('Car',StringType()),
     ('MPG',DoubleType()),
     ('Cylinders',IntegerType()),
     ('Displacement',DoubleType()),
     ('Horsepower',DoubleType()),
     ('Weight',DoubleType()),
     ('Acceleration',DoubleType()),
     ('Model',IntegerType()),
     ('Origin',StringType())
]

schema = StructType([StructField (x[0], x[1], True) for x in labels])
df = spark.read.csv('cars.csv', header=True, sep=";", schema=schema)
df.printSchema()

In [None]:
df.show(truncate=False)

In [None]:
# Afficher juste une colonne
df.select("Car").show(truncate=False)

In [None]:
# Selectionner deux colonnes
df.select("Car", "Cylinders").show(truncate=False)

In [None]:
# Créer une nouvelle colonne ayant la veleur 1
from pyspark.sql.functions import lit
df = df.withColumn('first_column',lit(1)) 
df.show(5,truncate=False)

In [None]:
# Créer une nouvelle colonne ayant la valeur 2
df = df.withColumn('second_column', lit(2))
df.show(5,truncate=False)

In [None]:
# Concatener deux colonnes pour en créer une seule
from pyspark.sql.functions import concat, col
df = df.withColumn('car_model', concat(col("Car"), lit(" "), col("model")))

In [None]:
# Renommer des colonnes
df = df.withColumnRenamed('first_column', 'new_column_one') \
       .withColumnRenamed('second_column', 'new_column_two')
df.show(truncate=False)

In [None]:
# Compter le nombre de voiture suivant leur origne
df.groupBy('Origin').count().show(5)

In [None]:
# Compter le nombre de voitures suivant leur orignes et model
df.groupBy('Origin', 'Model').count().show(5)

In [None]:
#Supprimer une colonne
df = df.drop('new_column_one')
df.show(5,truncate=False)

In [None]:
# Selectionner uniquement les voitures européennes
europe_filtered= df.filter(col('Origin')=='Europe')
europe_filtered.show(5,truncate=False)

In [None]:
# Trier les données
df.orderBy('Cylinders').show(truncate=False) 

In [None]:
# Créons deux données fictives
cars_df = spark.createDataFrame([[1, 'Car A'],[2, 'Car B'],[3, 'Car C']], ["id", "car_name"])
car_price_df = spark.createDataFrame([[1, 1000],[2, 2000],[3, 3000]], ["id", "car_price"])
cars_df.show()
car_price_df.show()

In [None]:
# Faisons la jointure entre ces deux données
cars_df.join(car_price_df, on="id", how='inner').show(truncate=False)