In [1]:
from pyspark.sql import SparkSession

# Create SparkSession with Avro package explicitly set
spark = SparkSession.builder \
    .master("local[1]") \
    .appName("SparkByExamples.com") \
    .config("spark.jars.packages", "org.apache.spark:spark-avro_2.12:3.5.0") \
    .getOrCreate()

# Print loaded packages to verify Avro package inclusion
print(spark.sparkContext.getConf().get("spark.jars.packages"))


None


24/02/22 13:56:47 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [2]:
from pyspark.sql.types import StructType, StructField, FloatType, StringType, IntegerType
schema=StructType([
    StructField("PassengerId", IntegerType(), True),
    StructField("Survived", IntegerType(), True),
    StructField("Pclass", IntegerType(), True),
    StructField("Name", StringType(), True),
    StructField("Sex", StringType(), True),
    StructField("Age", FloatType(), True),
    StructField("SibSp", IntegerType(), True),
    StructField("Parch", IntegerType(), True),
    StructField("Ticket", StringType(), True),
    StructField("Fare", FloatType(), True),
    StructField("Cabin", StringType(), True),
    StructField("Embarked", StringType(), True),
])
df = spark.read.option("header", True).csv("inputs/Titanic-Dataset.csv", schema=schema)
df.printSchema()

root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: float (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: float (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)



In [3]:
df.show(truncate=True)

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| NULL|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| NULL|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.05| NULL|       S|
|          6|       0|     3|    Moran, Mr. James|  male|NULL|    0|    0|      

In [4]:
df = df.select(df["PassengerId"], df["Name"], df["Sex"], df["Age"], df["Fare"])

In [5]:
df.show(truncate=False)

+-----------+-------------------------------------------------------+------+----+-------+
|PassengerId|Name                                                   |Sex   |Age |Fare   |
+-----------+-------------------------------------------------------+------+----+-------+
|1          |Braund, Mr. Owen Harris                                |male  |22.0|7.25   |
|2          |Cumings, Mrs. John Bradley (Florence Briggs Thayer)    |female|38.0|71.2833|
|3          |Heikkinen, Miss. Laina                                 |female|26.0|7.925  |
|4          |Futrelle, Mrs. Jacques Heath (Lily May Peel)           |female|35.0|53.1   |
|5          |Allen, Mr. William Henry                               |male  |35.0|8.05   |
|6          |Moran, Mr. James                                       |male  |NULL|8.4583 |
|7          |McCarthy, Mr. Timothy J                                |male  |54.0|51.8625|
|8          |Palsson, Master. Gosta Leonard                         |male  |2.0 |21.075 |
|9        

In [6]:
# delete row with nulls values
df = df.na.drop(subset=["Sex", "Age"])

### Utiliser une Window + Row number => pour trouver l'homme et la femme les plus agés

In [7]:
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, col
windowSex = Window.partitionBy("Sex").orderBy(col("Age").desc())
df.withColumn("row_number", row_number().over(windowSex)) \
    .filter(col("row_number") == 1) \
    .show(truncate=False)

+-----------+------------------------------------+------+----+-------+----------+
|PassengerId|Name                                |Sex   |Age |Fare   |row_number|
+-----------+------------------------------------+------+----+-------+----------+
|276        |Andrews, Miss. Kornelia Theodosia   |female|63.0|77.9583|1         |
|631        |Barkworth, Mr. Algernon Henry Wilson|male  |80.0|30.0   |1         |
+-----------+------------------------------------+------+----+-------+----------+



### écrire au format parquet et avro les passagers de plus de 40 ans dans le format : id, name, age, fare

In [8]:
df_above_40_years_old = df.filter(col("Age") > 40).drop("Sex")
df_above_40_years_old.show()

+-----------+--------------------+----+-------+
|PassengerId|                Name| Age|   Fare|
+-----------+--------------------+----+-------+
|          7|McCarthy, Mr. Tim...|54.0|51.8625|
|         12|Bonnell, Miss. El...|58.0|  26.55|
|         16|Hewlett, Mrs. (Ma...|55.0|   16.0|
|         34|Wheadon, Mr. Edwa...|66.0|   10.5|
|         36|Holverson, Mr. Al...|42.0|   52.0|
|         53|Harper, Mrs. Henr...|49.0|76.7292|
|         55|Ostby, Mr. Engelh...|65.0|61.9792|
|         63|Harris, Mr. Henry...|45.0| 83.475|
|         93|Chaffee, Mr. Herb...|46.0| 61.175|
|         95|   Coxon, Mr. Daniel|59.0|   7.25|
|         97|Goldschmidt, Mr. ...|71.0|34.6542|
|        111|Porter, Mr. Walte...|47.0|   52.0|
|        117|Connors, Mr. Patrick|70.5|   7.75|
|        125|White, Mr. Perciv...|54.0|77.2875|
|        130|  Ekstrom, Mr. Johan|45.0|  6.975|
|        133|Robins, Mrs. Alex...|47.0|   14.5|
|        150|Byles, Rev. Thoma...|42.0|   13.0|
|        151|Bateman, Rev. Rob...|51.0| 

In [14]:
df_above_40_years_old.write.parquet("outputs/people_above_40_years_old")

In [11]:
# ça marche pas
df_above_40_years_old.write.format("org.apache.spark:spark-avro_2.12:3.5.0").save("outputs/people_above_40_years_old.avro")

Py4JJavaError: An error occurred while calling o131.save.
: org.apache.spark.SparkClassNotFoundException: [DATA_SOURCE_NOT_FOUND] Failed to find the data source: org.apache.spark:spark-avro_2.12:3.5.0. Please find packages at `https://spark.apache.org/third-party-projects.html`.
	at org.apache.spark.sql.errors.QueryExecutionErrors$.dataSourceNotFoundError(QueryExecutionErrors.scala:724)
	at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:647)
	at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSourceV2(DataSource.scala:697)
	at org.apache.spark.sql.DataFrameWriter.lookupV2Provider(DataFrameWriter.scala:863)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:257)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:240)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:75)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:52)
	at java.base/java.lang.reflect.Method.invoke(Method.java:580)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:1583)
Caused by: java.lang.ClassNotFoundException: org.apache.spark:spark-avro_2.12:3.5.0.DefaultSource
	at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:445)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:593)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:526)
	at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$lookupDataSource$5(DataSource.scala:633)
	at scala.util.Try$.apply(Try.scala:213)
	at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$lookupDataSource$4(DataSource.scala:633)
	at scala.util.Failure.orElse(Try.scala:224)
	at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:633)
	... 16 more


### Les même données dans un format parquet mais en utilisant des dossiers => Partitionnement par sex

In [13]:
df.filter(col("Age") > 40).write.partitionBy("Sex").parquet("outputs/people_above_40_per_sex")