# Projet

## Imports & lecture des fichiers

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

spark = SparkSession.builder.appName('abc').getOrCreate()

df1=spark.read.format("csv").option("header","true").option("inferSchema","true").load('ApplePrices.csv')

df2=spark.read.format("csv").option("header","true").option("inferSchema","true").load('CurrencyConversion.csv')

df1.show()

df2.show()

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/01/19 10:12:04 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


+--------------------+-------+---------+--------+
|          Model_name|  Price|  Country|Currency|
+--------------------+-------+---------+--------+
|        24-inch iMac|1919.01|Australia|     AUD|
|         AirPods Max| 908.47|Australia|     AUD|
|         AirPods Pro|  403.2|Australia|     AUD|
|AirPods(2nd gener...| 221.31|Australia|     AUD|
|AirPods(3rd gener...| 281.94|Australia|     AUD|
|Apple Pencil (2nd...|  201.1|Australia|     AUD|
|         Apple TV 4K| 251.62|Australia|     AUD|
|      Apple Watch SE| 433.52|Australia|     AUD|
|Apple Watch Series 3| 302.15|Australia|     AUD|
|         MacBook Air| 1514.8|Australia|     AUD|
|         Magic Mouse| 110.15|Australia|     AUD|
|          Sport Band|  69.73|Australia|     AUD|
|                iPad| 504.26|Australia|     AUD|
|            iPad Pro|1211.64|Australia|     AUD|
|           iPhone 12|1009.53|Australia|     AUD|
|           iPhone 13|1211.64|Australia|     AUD|
|           iPhone SE| 686.16|Australia|     AUD|


## Join the dataframes et conversion

In [2]:
join_express = df1["Currency"] == df2["ISO_4217"]
joined = df1.join(df2, join_express, "inner")

In [3]:
joined_and_converted_prices = joined.withColumn("Dollar Prices", round(expr("Price / Dollar_To_Curr_Ratio"), 2))
joined_and_converted_prices.show()

+--------------------+-------+---------+--------+--------+--------------------+-------------+
|          Model_name|  Price|  Country|Currency|ISO_4217|Dollar_To_Curr_Ratio|Dollar Prices|
+--------------------+-------+---------+--------+--------+--------------------+-------------+
|        24-inch iMac|1919.01|Australia|     AUD|     AUD|                 1.4|      1370.72|
|         AirPods Max| 908.47|Australia|     AUD|     AUD|                 1.4|       648.91|
|         AirPods Pro|  403.2|Australia|     AUD|     AUD|                 1.4|        288.0|
|AirPods(2nd gener...| 221.31|Australia|     AUD|     AUD|                 1.4|       158.08|
|AirPods(3rd gener...| 281.94|Australia|     AUD|     AUD|                 1.4|       201.39|
|Apple Pencil (2nd...|  201.1|Australia|     AUD|     AUD|                 1.4|       143.64|
|         Apple TV 4K| 251.62|Australia|     AUD|     AUD|                 1.4|       179.73|
|      Apple Watch SE| 433.52|Australia|     AUD|     AUD|  

In [4]:
joined_and_converted_prices = joined_and_converted_prices.drop("ISO_4217").drop("Dollar_To_Curr_Ratio")
joined_and_converted_prices.show(1000)

+--------------------+---------+--------------+--------+-------------+
|          Model_name|    Price|       Country|Currency|Dollar Prices|
+--------------------+---------+--------------+--------+-------------+
|        24-inch iMac|  1919.01|     Australia|     AUD|      1370.72|
|         AirPods Max|   908.47|     Australia|     AUD|       648.91|
|         AirPods Pro|    403.2|     Australia|     AUD|        288.0|
|AirPods(2nd gener...|   221.31|     Australia|     AUD|       158.08|
|AirPods(3rd gener...|   281.94|     Australia|     AUD|       201.39|
|Apple Pencil (2nd...|    201.1|     Australia|     AUD|       143.64|
|         Apple TV 4K|   251.62|     Australia|     AUD|       179.73|
|      Apple Watch SE|   433.52|     Australia|     AUD|       309.66|
|Apple Watch Series 3|   302.15|     Australia|     AUD|       215.82|
|         MacBook Air|   1514.8|     Australia|     AUD|       1082.0|
|         Magic Mouse|   110.15|     Australia|     AUD|        78.68|
|     

# Ecart à la moyenne

In [5]:
avg_prices = joined_and_converted_prices.groupBy("Country")\
.agg(avg(col("Dollar Prices")))\
.sort("Country")\
.select("Country", round("`avg(Dollar Prices)`", 2)) \
.withColumnRenamed("round(avg(Dollar Prices), 2)", "Moyenne")
avg_prices.show(100)

+--------------+-------+
|       Country|Moyenne|
+--------------+-------+
|     Australia| 472.28|
|       Austria| 513.85|
|        Canada|  432.0|
|Czech Republic| 525.17|
|       Denmark| 526.69|
|       Finland| 517.59|
|        France|  537.5|
|       Germany| 532.44|
|       Hungary| 510.55|
|         India| 528.46|
|       Ireland| 545.47|
|         Italy| 572.03|
|    Luxembourg| 489.56|
|        Mexico| 552.37|
|   Netherlands| 506.51|
|        Norway|  527.2|
|   Philippines| 459.81|
|        Poland| 503.47|
|      Portugal| 517.59|
|        Russia| 534.24|
|         Spain| 558.75|
|        Sweden| 529.92|
|      Thailand| 417.46|
| United States| 420.18|
+--------------+-------+



In [6]:
usa_average = avg_prices.where(col("Country")== "United States").select("Moyenne").collect()[0][0]
usa_average = str(usa_average)
moyennePrix = avg_prices.withColumn("Ecart à la moyenne (%)", round(expr(f"(abs(Moyenne) - {usa_average})/ {usa_average} * 100"), 2))\
.sort(desc("Ecart à la moyenne (%)"))\

moyennePrix.show(100)
moyennePrix.repartition(1).write.mode("overwrite").format("csv").option("header", "true").save("moyennePrix")

+--------------+-------+----------------------+
|       Country|Moyenne|Ecart à la moyenne (%)|
+--------------+-------+----------------------+
|         Italy| 572.03|                 36.14|
|         Spain| 558.75|                 32.98|
|        Mexico| 552.37|                 31.46|
|       Ireland| 545.47|                 29.82|
|        France|  537.5|                 27.92|
|        Russia| 534.24|                 27.15|
|       Germany| 532.44|                 26.72|
|        Sweden| 529.92|                 26.12|
|         India| 528.46|                 25.77|
|        Norway|  527.2|                 25.47|
|       Denmark| 526.69|                 25.35|
|Czech Republic| 525.17|                 24.99|
|       Finland| 517.59|                 23.18|
|      Portugal| 517.59|                 23.18|
|       Austria| 513.85|                 22.29|
|       Hungary| 510.55|                 21.51|
|   Netherlands| 506.51|                 20.55|
|        Poland| 503.47|                

22/01/19 10:12:11 ERROR FileOutputCommitter: Mkdirs failed to create file:/home/jovyan/work/Projet/moyennePrix/_temporary/0
22/01/19 10:12:12 ERROR Executor: Exception in task 0.0 in stage 39.0 (TID 26)
java.io.IOException: Mkdirs failed to create file:/home/jovyan/work/Projet/moyennePrix/_temporary/0/_temporary/attempt_202201191012123129515317376334303_0039_m_000000_26 (exists=false, cwd=file:/home/jovyan/work/Projet)
	at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:515)
	at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:500)
	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1195)
	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1175)
	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1064)
	at org.apache.spark.sql.execution.datasources.CodecStreams$.createOutputStream(CodecStreams.scala:81)
	at org.apache.spark.sql.execution.datasources.CodecStreams$.createOutputStreamWriter(CodecStreams.scala:92)
	at org.ap

Py4JJavaError: An error occurred while calling o86.save.
: org.apache.spark.SparkException: Job aborted.
	at org.apache.spark.sql.errors.QueryExecutionErrors$.jobAbortedError(QueryExecutionErrors.scala:496)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:251)
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:186)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:113)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:111)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:125)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:110)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:110)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:106)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:481)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:82)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:481)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:457)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:106)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:93)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:91)
	at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:128)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:848)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:382)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:355)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 39.0 failed 1 times, most recent failure: Lost task 0.0 in stage 39.0 (TID 26) (fed2ff99cdec executor driver): java.io.IOException: Mkdirs failed to create file:/home/jovyan/work/Projet/moyennePrix/_temporary/0/_temporary/attempt_202201191012123129515317376334303_0039_m_000000_26 (exists=false, cwd=file:/home/jovyan/work/Projet)
	at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:515)
	at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:500)
	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1195)
	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1175)
	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1064)
	at org.apache.spark.sql.execution.datasources.CodecStreams$.createOutputStream(CodecStreams.scala:81)
	at org.apache.spark.sql.execution.datasources.CodecStreams$.createOutputStreamWriter(CodecStreams.scala:92)
	at org.apache.spark.sql.execution.datasources.csv.CsvOutputWriter.<init>(CsvOutputWriter.scala:38)
	at org.apache.spark.sql.execution.datasources.csv.CSVFileFormat$$anon$1.newInstance(CSVFileFormat.scala:85)
	at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.newOutputWriter(FileFormatDataWriter.scala:161)
	at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.<init>(FileFormatDataWriter.scala:146)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:290)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$16(FileFormatWriter.scala:229)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2403)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2352)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2351)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2351)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1109)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1109)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1109)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2591)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2533)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2522)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:898)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2214)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:218)
	... 41 more
Caused by: java.io.IOException: Mkdirs failed to create file:/home/jovyan/work/Projet/moyennePrix/_temporary/0/_temporary/attempt_202201191012123129515317376334303_0039_m_000000_26 (exists=false, cwd=file:/home/jovyan/work/Projet)
	at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:515)
	at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:500)
	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1195)
	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1175)
	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1064)
	at org.apache.spark.sql.execution.datasources.CodecStreams$.createOutputStream(CodecStreams.scala:81)
	at org.apache.spark.sql.execution.datasources.CodecStreams$.createOutputStreamWriter(CodecStreams.scala:92)
	at org.apache.spark.sql.execution.datasources.csv.CsvOutputWriter.<init>(CsvOutputWriter.scala:38)
	at org.apache.spark.sql.execution.datasources.csv.CSVFileFormat$$anon$1.newInstance(CSVFileFormat.scala:85)
	at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.newOutputWriter(FileFormatDataWriter.scala:161)
	at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.<init>(FileFormatDataWriter.scala:146)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:290)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$16(FileFormatWriter.scala:229)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	... 1 more


## Somme totale

In [None]:
cout_total = joined_and_converted_prices.groupBy("Country")\
.agg({"Dollar Prices": "sum"})\
.select("Country", round("sum(Dollar Prices)", 2))\
.withColumnRenamed("round(sum(Dollar Prices), 2)", "Somme des produits")\
.sort(desc("sum(Dollar Prices)"))\

cout_total.show(1000)
cout_total.repartition(1).write.mode("overwrite").format("csv").option("header", "true").save("cout_total")

## Liste des produits

In [None]:
listeProduit = df1.dropDuplicates(["Model_name"]).sort("Model_name")
listeProduit.show(1000)
listeProduit.repartition(1).write.mode("overwrite").format("csv").option("header", "true").save("listeProduit")

## Pays le moins cher pour acheter les AirPods Pro

In [None]:
airpodsPro = joined_and_converted_prices.filter("Model_name == 'AirPods Pro'").sort(asc("Dollar Prices")).limit(1)
airpodsPro.show()

In [None]:
airpodsPro.repartition(1).write.mode("overwrite").format("csv").option("header", "true").save("airpodsPro")

## Testing stuff

In [None]:
#joined_and_converted_prices.repartition("Country").sort("Country").show(10000)
joined_and_converted_prices.select("Country").intersect(joined_and_converted_prices.select("Model_name")).show()

In [None]:
rows = joined_and_converted_prices.select("Country").distinct().collect()
countries = []
result = joined_and_converted_prices
for row in rows:
    df = joined_and_converted_prices.filter(f"Country = '{row.Country}'")
    result = result.select("Model_name").intersect(df.select("Model_name"))

result.explain()

In [None]:
+--------------------+
|          Model_name|
+--------------------+
|                iPad|
|Apple Pencil (2nd...|
|AirPods(3rd gener...|
|      Apple Watch SE|
|AirPods(2nd gener...|
|         AirPods Pro|
|            iPad Pro|
|          Sport Band|
|           iPhone 12|
|         Magic Mouse|
|Apple Watch Series 3|
|           iPhone SE|
|         MacBook Air|
|        24-inch iMac|
|         Apple TV 4K|
+--------------------+

In [None]:
joined_and_converted_prices.where("Country = 'Austria'").select("Model_name").show(1000)

# New test

In [None]:
model_occurence = joined_and_converted_prices.groupBy("Model_name").count()
nb_country = len(joined_and_converted_prices.select("Country").distinct().collect())
list_produit = model_occurence.filter(f"count >= {nb_country}")
list_produit = list_produit.withColumnRenamed("Model_name", "Modele")
joined_expr = (list_produit["Modele"] == joined_and_converted_prices["Model_name"])
hello = list_produit.join(joined_and_converted_prices, joined_expr, "inner").drop("count", "Modele")

In [None]:
avg_prices = hello.groupBy("Country")\
.agg(avg(col("Dollar Prices")))\
.sort("Country")\
.select("Country", round("`avg(Dollar Prices)`", 2)) \
.withColumnRenamed("round(avg(Dollar Prices), 2)", "Moyenne")
avg_prices.show(100)

In [None]:
usa_average = avg_prices.where(col("Country")== "United States").select("Moyenne").collect()[0][0]
usa_average = str(usa_average)
moyennePrix = avg_prices.withColumn("Ecart à la moyenne (%)", round(expr(f"(abs(Moyenne) - {usa_average})/ {usa_average} * 100"), 2))\
.sort(desc("Ecart à la moyenne (%)"))\

moyennePrix.show(100)
moyennePrix.repartition(1).write.mode("overwrite").format("csv").option("header", "true").save("moyennePrix")