In [1]:
import findspark
findspark.init()
from pyspark import SparkConf
from pyspark.sql import SparkSession


# Begin Pyspark session

In [2]:


spark = SparkSession.builder \
    .appName("Redshift Connection with PySpark") \
    .config("spark.jars", "redshift-jdbc42-2.1.0.20.jar") \
    .getOrCreate()

conf = SparkConf()
conf.set("spark.driver.maxResultSize", "2g")

redshift_url ="jdbc:redshift://cars-cluster.c7utbjvxgyyz.eu-north-1.redshift.amazonaws.com:5439/cars"

redshift_properties = {
    "user": "aya",
    "password": "***",
    "driver": "com.amazon.redshift.Driver"
}

cars_query = "(SELECT cars.*, classes.model, classes.brand, classes.class, body.body FROM cars.prod.cars_data as cars join cars.prod.cars_classes_data as classes on cars.class_id = classes.class_id join cars.prod.cars_body_data as body on classes.model = body.model and classes.brand = body.brand)"

car_df = spark.read.jdbc(redshift_url, cars_query,
                         properties=redshift_properties)


car_df = car_df.repartition(1000)  # Adjust the number of partitions as needed


23/11/06 19:54:03 WARN Utils: Your hostname, Aya-MacBook.local resolves to a loopback address: 127.0.0.1; using 192.168.1.3 instead (on interface en0)
23/11/06 19:54:03 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
23/11/06 19:54:04 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/11/06 19:54:17 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors


## Explore columns

In [8]:
car_df.columns

['car_id',
 'class_id',
 'model_year',
 'ad_date',
 'transmission',
 'price',
 'fingerprint',
 'km',
 'color',
 'fuel',
 'city',
 'model',
 'brand',
 'class',
 'body']

In [6]:
car_df.groupBy('brand', 'model').count().orderBy('model').show()



+-------+-----+-----+
|  brand|model|count|
+-------+-----+-----+
|   Audi|  100|    1|
|Renault|   11|    1|
|   Fiat| 1100|    7|
|    BMW|  116|   15|
|    BMW|  118|    5|
|Renault|   12|    1|
| Datsun|  120|    1|
|    BMW|  120|    2|
|   Lada| 1200|    4|
|  Mazda|  121|    5|
|   Lada|  124|    1|
|   Fiat|  124|    6|
|   Fiat|  125|    4|
|   Fiat|  126|    1|
|   Fiat|  127|   67|
|   Fiat|  128|  253|
|   Lada| 1300|    4|
|   Fiat| 1300|    2|
|   Seat|  131|    2|
|   Fiat|  131|   65|
+-------+-----+-----+
only showing top 20 rows



                                                                                

 we can see that model name solely is not unique and can be found for more than one brand

In [9]:
car_df.groupBy('city').count().orderBy('count').show()



+--------------------+-----+
|                city|count|
+--------------------+-----+
|      Ras al Khaimah|    1|
|          El-Alamein|    2|
|            El Gouna|    3|
|                Edku|    5|
|New Administrativ...|    6|
|          Ras Gharib|    7|
|          Kafr Shukr|    7|
|              Safaga|    7|
|            Kom Ombo|    8|
|         El Gamaleya|   14|
|             Rosetta|   19|
|          Abu Hummus|   20|
|            El-Arish|   25|
|                Tala|   27|
|              Amreya|   28|
|       Marsa Matrouh|   28|
|    El Wadi El Gedid|   29|
|              Menouf|   32|
|             Zamalek|   34|
|           El Bagour|   39|
+--------------------+-----+
only showing top 20 rows



                                                                                

we can see that some city representation is rare... we can group cities by governorate to get better overview of cars demography

In [11]:
car_df.groupBy('class').count().orderBy('count', ascending=False).show(500)



+--------------------+-----+
|               class|count|
+--------------------+-----+
|                    |21269|
|                None|10118|
|              Manual|  229|
|            Automtic|  103|
|                 M/T|   54|
|Automatic / High ...|   52|
|           Automatic|   41|
|Automatic / High ...|   40|
|Automtic / Tinted...|   36|
|        Manual / A/C|   36|
|    luxury automatic|   31|
|Automatic / Tinte...|   25|
|           A/T GL HD|   24|
|full option / Aut...|   23|
|   Automatic / Turbo|   22|
|           M/T / F/O|   22|
|ِAutomatic / Limited|   22|
| Automatic / Dynamic|   18|
|             F/O M/T|   16|
|Automatic / full ...|   16|
|Automatic / Pure ...|   15|
|  Automatic / LUXURY|   15|
|Automatic / full ...|   15|
|             manual‏|   15|
|        A/T / Luxury|   13|
|Manual / Tinted G...|   12|
|Automatic / VTI /...|   12|
|        A/T / Active|   12|
|Automatic / full ...|   11|
|       1.6 A/T Sport|   11|
|            A/T / B2|   11|
|           HB

                                                                                

Most cars are either basic or have no class, so we can put all of those in one category. Some classes are just indication to the car transmission type (automatic or manual). We can remove this data as theere is already a transmission column in the data.

In [12]:
car_df.groupBy('body').count().orderBy('count', ascending=False).show()



+------------+-----+
|        body|count|
+------------+-----+
|       sedan|22255|
|         SUV| 5983|
|   hatchback| 3408|
|         4X4|  959|
|pickup truck|  192|
|       coupe|  125|
|   mini Vans|  111|
|     Minivan|   23|
|       truck|   17|
|       Sedan|   16|
|      sports|    4|
|         Van|    2|
|  sports car|    1|
+------------+-----+



                                                                                