In [2]:
!pip install pyspark



In [3]:
from pyspark.sql import SparkSession

In [5]:
spark = SparkSession.builder.appName("DataFrameExample").getOrCreate()

In [6]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

# Initialiser SparkSession
spark = SparkSession.builder.appName("DataFrameExample").getOrCreate()

# Définir le schéma
schema = StructType([
    StructField("firstname", StringType(), True),
    StructField("middlename", StringType(), True),
    StructField("lastname", StringType(), True),
    StructField("id", StringType(), True),
    StructField("gender", StringType(), True),
    StructField("salary", IntegerType(), True)
])

# Données
data = [
    ("James", "", "Smith", "36636", "M", 3000),
    ("Michael", "Rose", "", "40288", "M", 4000),
    ("Robert", "", "Williams", "42114", "M", 4000),
    ("Maria", "Anne", "Jones", "39192", "F", 4000),
    ("Jen", "Mary", "Brown", "", "F", -1)
]

# Créer DataFrame
df = spark.createDataFrame(data, schema)

# Afficher schéma et données
df.printSchema()
df.show()

root
 |-- firstname: string (nullable = true)
 |-- middlename: string (nullable = true)
 |-- lastname: string (nullable = true)
 |-- id: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: integer (nullable = true)

+---------+----------+--------+-----+------+------+
|firstname|middlename|lastname|   id|gender|salary|
+---------+----------+--------+-----+------+------+
|    James|          |   Smith|36636|     M|  3000|
|  Michael|      Rose|        |40288|     M|  4000|
|   Robert|          |Williams|42114|     M|  4000|
|    Maria|      Anne|   Jones|39192|     F|  4000|
|      Jen|      Mary|   Brown|     |     F|    -1|
+---------+----------+--------+-----+------+------+



In [7]:
df_renamed = df.withColumnRenamed("id", "identifiant")
df_renamed.printSchema()

root
 |-- firstname: string (nullable = true)
 |-- middlename: string (nullable = true)
 |-- lastname: string (nullable = true)
 |-- identifiant: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: integer (nullable = true)



In [8]:
df_femmes = df.filter(df.gender == "F")
df_femmes.show()

+---------+----------+--------+-----+------+------+
|firstname|middlename|lastname|   id|gender|salary|
+---------+----------+--------+-----+------+------+
|    Maria|      Anne|   Jones|39192|     F|  4000|
|      Jen|      Mary|   Brown|     |     F|    -1|
+---------+----------+--------+-----+------+------+



In [9]:
df.orderBy(["lastname", "firstname"]).show()


+---------+----------+--------+-----+------+------+
|firstname|middlename|lastname|   id|gender|salary|
+---------+----------+--------+-----+------+------+
|  Michael|      Rose|        |40288|     M|  4000|
|      Jen|      Mary|   Brown|     |     F|    -1|
|    Maria|      Anne|   Jones|39192|     F|  4000|
|    James|          |   Smith|36636|     M|  3000|
|   Robert|          |Williams|42114|     M|  4000|
+---------+----------+--------+-----+------+------+



In [10]:
df.sort("lastname", "firstname").show()


+---------+----------+--------+-----+------+------+
|firstname|middlename|lastname|   id|gender|salary|
+---------+----------+--------+-----+------+------+
|  Michael|      Rose|        |40288|     M|  4000|
|      Jen|      Mary|   Brown|     |     F|    -1|
|    Maria|      Anne|   Jones|39192|     F|  4000|
|    James|          |   Smith|36636|     M|  3000|
|   Robert|          |Williams|42114|     M|  4000|
+---------+----------+--------+-----+------+------+



In [11]:
from pyspark.sql import SparkSession

# Initialiser la session Spark
spark = SparkSession.builder.appName("JoinExample").getOrCreate()

# Définir le premier DataFrame
data1 = [("A101", "John"), ("A102", "Peter"), ("A103", "Charlie")]
columns1 = ["emp_id", "emp_name"]
dataframe_1 = spark.createDataFrame(data1, columns1)

# Définir le deuxième DataFrame
data2 = [("A101", 3250), ("A102", 6735), ("A103", 8650)]
columns2 = ["emp_id", "salary"]
dataframe_2 = spark.createDataFrame(data2, columns2)

# Jointure interne (INNER JOIN)
combined_df = dataframe_1.join(dataframe_2, on="emp_id", how="inner")

# Afficher les résultats sous forme de liste de Row
combined_df.collect()


[Row(emp_id='A101', emp_name='John', salary=3250),
 Row(emp_id='A102', emp_name='Peter', salary=6735),
 Row(emp_id='A103', emp_name='Charlie', salary=8650)]

AnalysisException: [PATH_NOT_FOUND] Path does not exist: file:/home/jovyan/people.json.

In [14]:
from pyspark.sql import SparkSession

# Créer une session Spark
spark = SparkSession.builder.appName("PeopleExample").getOrCreate()

# Lire le fichier JSON (remplace 'people.json' par le chemin correct)
df = spark.read.json("people.json")

# Afficher le contenu du DataFrame
df.show()

# Afficher le schéma du DataFrame
df.printSchema()

# Enregistrer comme vue temporaire pour requêtes SQL
df.createOrReplaceTempView("people")


+----+-------+
| age|   name|
+----+-------+
|NULL|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+

root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)



In [15]:
pip install pandas findspark 

Collecting findspark
  Downloading findspark-2.0.1-py2.py3-none-any.whl.metadata (352 bytes)
Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Installing collected packages: findspark
Successfully installed findspark-2.0.1
Note: you may need to restart the kernel to use updated packages.


In [16]:
import findspark
findspark.init()

from pyspark.sql import SparkSession
import pandas as pd

# Créer une session Spark
spark = SparkSession.builder.appName("CovidAnalysis").getOrCreate()

# Vérification de la session
print(spark.version)


3.5.0


In [17]:
# Lecture du fichier COVID-19
vaccination_data = pd.read_csv("covid-latest.csv")  # Remplace par le vrai chemin

# Afficher les 5 premières lignes
print(vaccination_data.head())

   Unnamed: 0  iso_code continent        location last_updated_date  \
0           0       AFG      Asia     Afghanistan        2024-08-04   
1           1  OWID_AFR       NaN          Africa        2024-08-04   
2           2       ALB    Europe         Albania        2024-08-04   
3           3       DZA    Africa         Algeria        2024-08-04   
4           4       ASM   Oceania  American Samoa        2024-08-04   

   total_cases  new_cases  new_cases_smoothed  total_deaths  new_deaths  ...  \
0     235214.0        0.0               0.000        7998.0         0.0  ...   
1   13145380.0       36.0               5.143      259117.0         0.0  ...   
2     335047.0        0.0               0.000        3605.0         0.0  ...   
3     272139.0       18.0               2.571        6881.0         0.0  ...   
4       8359.0        0.0               0.000          34.0         0.0  ...   

   male_smokers  handwashing_facilities  hospital_beds_per_thousand  \
0           NaN      

In [18]:
selected_data = vaccination_data[['continent', 'total_cases', 'total_deaths', 'total_vaccinations', 'population']]
print(selected_data.head())


  continent  total_cases  total_deaths  total_vaccinations    population
0      Asia     235214.0        7998.0                 NaN  4.112877e+07
1       NaN   13145380.0      259117.0                 NaN  1.426737e+09
2    Europe     335047.0        3605.0                 NaN  2.842318e+06
3    Africa     272139.0        6881.0                 NaN  4.490323e+07
4   Oceania       8359.0          34.0                 NaN  4.429500e+04


In [20]:
from pyspark.sql.types import StructType, StructField, StringType, DoubleType

# Nettoyer les données pour Spark
selected_data = selected_data.fillna(0)
selected_data = selected_data.astype({
    'continent': str,
    'total_cases': float,
    'total_deaths': float,
    'total_vaccinations': float,
    'population': float
})

# Définir le schéma pour Spark
schema = StructType([
    StructField("continent", StringType(), True),
    StructField("total_cases", DoubleType(), True),
    StructField("total_deaths", DoubleType(), True),
    StructField("total_vaccinations", DoubleType(), True),
    StructField("population", DoubleType(), True)
])

# Convertir en DataFrame Spark
spark_df = spark.createDataFrame(selected_data, schema=schema)


In [21]:
spark_df.printSchema()


root
 |-- continent: string (nullable = true)
 |-- total_cases: double (nullable = true)
 |-- total_deaths: double (nullable = true)
 |-- total_vaccinations: double (nullable = true)
 |-- population: double (nullable = true)



In [22]:
spark_df.select("continent", "total_cases", "total_deaths").show(5)

# Filtrer les données où les cas dépassent 1 million
spark_df.filter(spark_df.total_cases > 1_000_000).select("continent", "total_cases").show()


+---------+-----------+------------+
|continent|total_cases|total_deaths|
+---------+-----------+------------+
|     Asia|   235214.0|      7998.0|
|        0| 1.314538E7|    259117.0|
|   Europe|   335047.0|      3605.0|
|   Africa|   272139.0|      6881.0|
|  Oceania|     8359.0|        34.0|
+---------+-----------+------------+
only showing top 5 rows

+-------------+------------+
|    continent| total_cases|
+-------------+------------+
|            0|  1.314538E7|
|South America| 1.0101218E7|
|            0|3.01499099E8|
|      Oceania| 1.1861161E7|
|       Europe|   6082444.0|
|         Asia|   2051348.0|
|       Europe|   4872829.0|
|South America|   1212147.0|
|South America| 3.7511921E7|
|       Europe|   1329988.0|
|North America|   4819055.0|
|South America|   5401126.0|
|         Asia| 9.9373219E7|
|South America|   6391876.0|
|North America|   1234701.0|
|       Europe|   1317144.0|
|North America|   1113662.0|
|       Europe|   4761919.0|
|       Europe|   3435679.0|
|Sou

In [23]:
from pyspark.sql.functions import col, format_number, concat, lit

# Calculer le taux de mortalité
spark_df = spark_df.withColumn(
    "mortality_rate",
    format_number((col("total_deaths") / col("population")) * 100, 2)
)

# Ajouter le symbole %
spark_df = spark_df.withColumn(
    "mortality_rate",
    concat(col("mortality_rate"), lit("%"))
)

# Afficher quelques lignes
spark_df.select("continent", "total_deaths", "population", "mortality_rate").show(5)


+---------+------------+-------------+--------------+
|continent|total_deaths|   population|mortality_rate|
+---------+------------+-------------+--------------+
|     Asia|      7998.0|  4.1128772E7|         0.02%|
|        0|    259117.0|1.426736614E9|         0.02%|
|   Europe|      3605.0|    2842318.0|         0.13%|
|   Africa|      6881.0|  4.4903228E7|         0.02%|
|  Oceania|        34.0|      44295.0|         0.08%|
+---------+------------+-------------+--------------+
only showing top 5 rows



In [24]:
!pip install pandas
!pip install pyspark
!pip install findspark

