In [10]:
from pyspark.sql import SparkSession
import pandas as pd

#2- Import StructField, StructType, StringType, IntegerType, FloatType
from pyspark.sql.types import (StructField, StringType, IntegerType, StructType, FloatType)

8# controle s'il reste des valeurs nulles
from pyspark.sql.functions import col,isnan,when,count

In [7]:
schema = StructType([ 
    StructField("mpg",FloatType(),True), 
    StructField("cylinders",IntegerType(),True), 
    StructField("displacement",FloatType(),True), 
    StructField("horsepower", IntegerType(), True), 
    StructField("weight", IntegerType(), True), 
    StructField("acceleration", FloatType(), True), 
    StructField("model year", IntegerType(), True), 
    StructField("origin", IntegerType(), True), 
    StructField("car name", StringType(), True) 
  ])

In [8]:
#Create a spark session and declare it in spark variable

spark = SparkSession.builder.appName('Basics').getOrCreate()

In [None]:
#4- Import data from this path s3://dataset-pyspark/auto-mpg.csv and using a pre-defined schema
#5- Print the Dataframe

link = "02_dojo_auto-mpg.csv"

df = spark.read.csv(link, header = True, schema=schema)
df.show()

In [14]:
#6- Print the dataframe columns
#7- Print the dataframe schema

df.printSchema(),df.columns

root
 |-- mpg: float (nullable = true)
 |-- cylinders: integer (nullable = true)
 |-- displacement: float (nullable = true)
 |-- horsepower: integer (nullable = true)
 |-- weight: integer (nullable = true)
 |-- acceleration: float (nullable = true)
 |-- model year: integer (nullable = true)
 |-- origin: integer (nullable = true)
 |-- car name: string (nullable = true)



(None,
 ['mpg',
  'cylinders',
  'displacement',
  'horsepower',
  'weight',
  'acceleration',
  'model year',
  'origin',
  'car name'])

In [None]:
#8- Drop NAs

df = df.dropna()
df.show()


In [16]:
# controle s'il reste des valeurs nulles


df.select([count(when(isnan(c), c)).alias(c) for c in df.columns]).show()


+---+---------+------------+----------+------+------------+----------+------+--------+
|mpg|cylinders|displacement|horsepower|weight|acceleration|model year|origin|car name|
+---+---------+------------+----------+------+------------+----------+------+--------+
|  0|        0|           0|         0|     0|           0|         0|     0|       0|
+---+---------+------------+----------+------+------------+----------+------+--------+



In [None]:
#9- Rename columns df = "model year" to "model_year" and "car name" to "car_name"
# 10- Print the Dataframe

df = df.withColumnRenamed('model year','model_year').withColumnRenamed('car name','car_name')
df.show()

In [None]:
# 11- Show filtred data on mpg > 20

df.filter( 'mpg > 20').show()


In [19]:
# 12- Show filtered data on df.horsepower > 80 
#and df.weight > 1500 
#and select only car_name 
#ad mpg 
#and finally order by descending mpg

df.filter( (df.horsepower > 80) & (df.weight > 1500) )\
    .select(['car_name', 'mpg']) \
    .orderBy('mpg', ascending = False) \
    .show()
    


+--------------------+----+
|            car_name| mpg|
+--------------------+----+
|oldsmobile cutlas...|38.0|
|datsun 510 hatchback|37.0|
|    nissan stanza xe|36.0|
|   dodge charger 2.2|36.0|
|   triumph tr7 coupe|35.0|
|chevrolet cavalie...|34.0|
|      dodge colt m/m|33.5|
|     pontiac phoenix|33.5|
|        datsun 200sx|32.9|
|       datsun 280-zx|32.7|
|       dodge rampage|32.0|
|    toyota celica gt|32.0|
|pontiac j2000 se ...|31.0|
|          chevy s-10|31.0|
|    plymouth reliant|30.0|
|toyota corona lif...|29.8|
|            audi fox|29.0|
|      dodge aries se|29.0|
|  chevrolet citation|28.8|
|buick skylark lim...|28.4|
+--------------------+----+
only showing top 20 rows



In [20]:
#13- Show filtered data on car_name containing "chevrolet" and order by horsepower and acceleration both on descending order

df.filter(df.car_name.contains("chevrolet")) \
    .orderBy(col("mpg"),col("acceleration"), ascending = False).show()


+----+---------+------------+----------+------+------------+----------+------+--------------------+
| mpg|cylinders|displacement|horsepower|weight|acceleration|model_year|origin|            car_name|
+----+---------+------------+----------+------+------------+----------+------+--------------------+
|34.0|        4|       112.0|        88|  2395|        18.0|        82|     1|chevrolet cavalie...|
|32.1|        4|        98.0|        70|  2120|        15.5|        80|     1|  chevrolet chevette|
|30.5|        4|        98.0|        63|  2051|        17.0|        77|     1|  chevrolet chevette|
|30.0|        4|        98.0|        68|  2155|        16.5|        78|     1|  chevrolet chevette|
|29.0|        4|        85.0|        52|  2035|        22.2|        76|     1|  chevrolet chevette|
|28.8|        6|       173.0|       115|  2595|        11.3|        79|     1|  chevrolet citation|
|28.0|        4|       112.0|        88|  2605|        19.6|        82|     1|  chevrolet cavalier|


In [21]:
#14- Create a new column called "brand" which contains only the first word of "car_name" column

import pyspark

split_col = pyspark.sql.functions.split(df['car_name'], ' ')

df = df.withColumn('brand', split_col.getItem(0))
df.show()


+----+---------+------------+----------+------+------------+----------+------+--------------------+----------+
| mpg|cylinders|displacement|horsepower|weight|acceleration|model_year|origin|            car_name|     brand|
+----+---------+------------+----------+------+------------+----------+------+--------------------+----------+
|18.0|        8|       307.0|       130|  3504|        12.0|        70|     1|chevrolet chevell...| chevrolet|
|15.0|        8|       350.0|       165|  3693|        11.5|        70|     1|   buick skylark 320|     buick|
|18.0|        8|       318.0|       150|  3436|        11.0|        70|     1|  plymouth satellite|  plymouth|
|16.0|        8|       304.0|       150|  3433|        12.0|        70|     1|       amc rebel sst|       amc|
|17.0|        8|       302.0|       140|  3449|        10.5|        70|     1|         ford torino|      ford|
|15.0|        8|       429.0|       198|  4341|        10.0|        70|     1|    ford galaxie 500|      ford|
|

In [33]:
#15- Group data by brand and compute horsepower mean

df_mean = df.groupby('brand').mean('horsepower').withColumnRenamed('avg(horsepower)','avg_horsepower')
df_mean.show()


+---------+------------------+
|    brand|    avg_horsepower|
+---------+------------------+
|    buick|136.41176470588235|
|  pontiac|          136.9375|
| mercedes|              77.0|
|   toyota|             83.44|
|     saab|            108.75|
|      amc|114.70370370370371|
|       vw|60.833333333333336|
|  peugeot|              88.0|
| chrysler|153.66666666666666|
| plymouth|111.41935483870968|
|vokswagen|              62.0|
|    chevy|142.33333333333334|
|     audi| 86.71428571428571|
|   datsun| 83.82608695652173|
|      bmw|             111.5|
|    dodge|117.17857142857143|
|     ford|112.22916666666667|
|  toyouta|              97.0|
|    capri|              92.0|
| cadillac|             152.5|
+---------+------------------+
only showing top 20 rows



In [34]:
#16 - Create a SQL view called "autodata"

df_mean.createOrReplaceTempView('autodata')

In [35]:
#17- declare variable called result equals Select all from autodata view

result = spark.sql("SELECT * FROM autodata")
result.show()

+---------+------------------+
|    brand|    avg_horsepower|
+---------+------------------+
|    buick|136.41176470588235|
|  pontiac|          136.9375|
| mercedes|              77.0|
|   toyota|             83.44|
|     saab|            108.75|
|      amc|114.70370370370371|
|       vw|60.833333333333336|
|  peugeot|              88.0|
| chrysler|153.66666666666666|
| plymouth|111.41935483870968|
|vokswagen|              62.0|
|    chevy|142.33333333333334|
|     audi| 86.71428571428571|
|   datsun| 83.82608695652173|
|      bmw|             111.5|
|    dodge|117.17857142857143|
|     ford|112.22916666666667|
|  toyouta|              97.0|
|    capri|              92.0|
| cadillac|             152.5|
+---------+------------------+
only showing top 20 rows



In [36]:
#18- Run a new request by selecting only the raws where hosepower is greater than 150

new_results = spark.sql ("SELECT * FROM autodata WHERE avg_horsepower > 150")
new_results.show()

+--------+------------------+
|   brand|    avg_horsepower|
+--------+------------------+
|chrysler|153.66666666666666|
|cadillac|             152.5|
|      hi|             193.0|
+--------+------------------+

