In [1]:
import os
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, regexp_replace, when

In [2]:
import seaborn as sns
import matplotlib.pyplot as plt

In [3]:
from IPython.display import display, HTML
import warnings

In [4]:
display(HTML("<style>.container { width:100% !important; }</style>"))
warnings.filterwarnings('ignore')
# pd.options.display.float_format = "{:.2f}".format

In [5]:
pyspark.__version__

'3.3.2'

In [6]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('eda_preprocessing') \
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/06/20 20:49:00 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/06/20 20:49:02 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [7]:
df = spark.read \
    .option("header", "true") \
    .option("inferSchema", "True") \
    .csv("/home/konradballegro/scripts/scraper/outputs/data/offers.csv")

                                                                                

In [8]:
# Count the number of rows in the DataFrame
num_rows = df.count()

# Count the number of columns in the DataFrame
num_cols = len(df.columns)

# Print the shape of the DataFrame
print("Number of rows: ", num_rows)
print("Number of columns: ", num_cols)

[Stage 2:>                                                          (0 + 4) / 4]

Number of rows:  129589
Number of columns:  230


                                                                                

In [9]:
headers = df.columns
for h, head in enumerate(headers):
    print(f"{h}: {head}")

0: Offer from
1: Category
2: Show offers with VIN number
3: Has registration number
4: Vehicle brand
5: Vehicle model
6: Version
7: Generation
8: Year of production
9: Mileage
10: Engine capacity
11: Fuel type
12: Power
13: Gearbox
14: Range
15: Drive
16: Battery capacity
17: Battery ownership type
18: CO2 emissions
19: Particulate filter
20: City fuel consumption
21: Body type
22: Number of doors
23: Number of seats
24: Color
25: Metallic
26: Color type
27: Right-hand drive (Anglik)
28: Country of origin
29: Leasing
30: VAT margin
31: VAT invoice
32: Manufacturer warranty period
33: Financing possibility
34: First registration
35: Registered in Poland
36: First owner
37: Accident-free
38: Serviced at authorized service center
39: Condition
40: ABS
41: Apple CarPlay
42: Android Auto
43: Rear side airbags43
44: Driver side airbag
45: CD
46: Central locking
47: Electric front windows
48: Electrically adjustable passenger seat
49: Electrically adjustable mirrors
50: Immobilizer
51: Driver

In [10]:
# Sample 20% of the rows without replacement
sampled_df = df.sample(withReplacement=False, fraction=0.2)

In [11]:
# Count the number of rows in the DataFrame
num_rows = sampled_df.count()

# Count the number of columns in the DataFrame
num_cols = len(sampled_df.columns)

# Print the shape of the DataFrame
print("Number of rows: ", num_rows)
print("Number of columns: ", num_cols)



Number of rows:  25897
Number of columns:  230


                                                                                

In [12]:
price = sampled_df.select("Price").rdd.flatMap(lambda x: x).collect()

                                                                                

In [13]:
# # Assuming you have a PySpark DataFrame called 'sampled_df' with a column 'Price'
# # Extract the 'Price' column as a list
# price_data = sampled_df.select("Price").rdd.flatMap(lambda x: x).collect()

# # Create a histogram using Seaborn
# sns.histplot(data=price_data, bins=10, kde=True)

# # Customize the plot
# plt.xlabel("Price")
# plt.ylabel("Frequency")
# plt.title("Histogram of Prices")

# # Show the plot
# plt.show()


In [14]:
df = df.filter((df["Currency"] == "PLN") &
                        (df["Country of origin"] == "Polska") &
                        (df["Accident-free"].isNotNull()) &
                        (df["Price"].isNotNull()) &
                        (df["Offer from"].isNotNull()) &
                        (df["Condition"].isNotNull()) &
                        (df["Vehicle brand"].isNotNull()) &
                        (df["Vehicle model"].isNotNull()) &
                        (df["Year of production"].isNotNull()) &
                        (df["Mileage"].isNotNull()) &
                        (df["Fuel type"].isNotNull()) &
                        (df["Power"].isNotNull()) &
                        (df["Gearbox"].isNotNull()) &
                        (df["Body type"].isNotNull()) &
                        (df["Number of doors"].isNotNull())
                       ).select(col("Price").cast("float").alias("Price"),
                                "Offer from",
                                "Condition",
                                "Vehicle brand",
                                "Vehicle model",
                                col("Year of production").cast("string").alias("Year of production"),
                                regexp_replace(regexp_replace(col("Mileage"), " ", ""), "km", "").cast("float").alias("Mileage"),
                                "Fuel type",
                                regexp_replace(regexp_replace(col("Power"), " ", ""), "KM", "").cast("integer").alias("Power"),
                                "Gearbox",
                                "Body type",
                                "Number of doors",
                                "URL path",
                                "ID",
                                "Epoch"
                               )

In [15]:
# Count the number of rows in the DataFrame
num_rows = df.count()

# Count the number of columns in the DataFrame
num_cols = len(df.columns)

# Print the shape of the DataFrame
print("Number of rows: ", num_rows)
print("Number of columns: ", num_cols)



Number of rows:  33225
Number of columns:  15


                                                                                

In [16]:
df.show()

+--------+---------------+---------+-------------+-------------+------------------+--------+-----------+-----+------------+-------------+---------------+--------------------+--------------------+----------+
|   Price|     Offer from|Condition|Vehicle brand|Vehicle model|Year of production| Mileage|  Fuel type|Power|     Gearbox|    Body type|Number of doors|            URL path|                  ID|     Epoch|
+--------+---------------+---------+-------------+-------------+------------------+--------+-----------+-----+------------+-------------+---------------+--------------------+--------------------+----------+
| 28000.0|Osoby prywatnej|  Używane|        Honda|        Civic|              2010|189347.0|    Benzyna|  140|    Manualna|        Sedan|            4.0|https://www.otomo...|000d1349f23a1d685...|1687241613|
| 92900.0|Osoby prywatnej|  Używane|        Honda|        Civic|              2018| 81240.0|    Benzyna|  182|    Manualna|      Kompakt|            5.0|https://www.otomo..

In [17]:
distinct_offers = df.select("Offer from").distinct().rdd.flatMap(lambda x: x).collect()
distinct_conditions = df.select("Condition").distinct().rdd.flatMap(lambda x: x).collect()
distinct_brands = df.select("Vehicle brand").distinct().rdd.flatMap(lambda x: x).collect()
distinct_models = df.select("Vehicle model").distinct().rdd.flatMap(lambda x: x).collect()
distinct_years = df.select("Year of production").distinct().rdd.flatMap(lambda x: x).collect()
distinct_fuel = df.select("Fuel type").distinct().rdd.flatMap(lambda x: x).collect()
distinct_gearbox = df.select("Gearbox").distinct().rdd.flatMap(lambda x: x).collect()
distinct_body = df.select("Body type").distinct().rdd.flatMap(lambda x: x).collect()
distinct_doors = df.select("Number of doors").distinct().rdd.flatMap(lambda x: x).collect()

                                                                                

In [18]:
for offer in distinct_offers:
    column_name = "Offer_type_" + offer.replace(" ", "_")
    df = df.withColumn(column_name, when(df["Offer from"] == offer, 1).otherwise(0))

In [19]:
for condition in distinct_conditions:
    column_name = "Condition_" + condition.replace(" ", "_")
    df = df.withColumn(column_name, when(df["Condition"] == condition, 1).otherwise(0))

In [20]:
for model in distinct_models:
    column_name = "Vehicle_model_" + model.replace(" ", "_")
    df = df.withColumn(column_name, when(df["Vehicle model"] == model, 1).otherwise(0))

In [21]:
for year in distinct_years:
    column_name = "Year_of_production_" + str(year)
    df = df.withColumn(column_name, when(df["Year of production"] == year, 1).otherwise(0))

In [22]:
for fuel in distinct_fuel:
    column_name = "Fuel_type_" + fuel.replace(" ", "_")
    df = df.withColumn(column_name, when(df["Fuel type"] == fuel, 1).otherwise(0))

In [23]:
for gearbox in distinct_gearbox:
    column_name = "Gearbox_" + gearbox.replace(" ", "_")
    df = df.withColumn(column_name, when(df["Gearbox"] == gearbox, 1).otherwise(0))

In [24]:
for body in distinct_body:
    column_name = "Body_type_" + body.replace(" ", "_")
    df = df.withColumn(column_name, when(df["Body type"] == body, 1).otherwise(0))

In [25]:
for doors in distinct_doors:
    column_name = "Number_of_doors_" + str(doors)
    df = df.withColumn(column_name, when(df["Number of doors"] == doors, 1).otherwise(0))

In [27]:
df = df.filter(df["Price"].isNotNull())

In [28]:
df_pandas = df.toPandas()

23/06/20 20:52:59 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


                                                                                

In [29]:
df_pandas

Unnamed: 0,Price,Offer from,Condition,Vehicle brand,Vehicle model,Year of production,Mileage,Fuel type,Power,Gearbox,...,Body_type_Coupe,Body_type_Minivan,Body_type_Kompakt,Body_type_Auta_miejskie,Body_type_Kombi,Number_of_doors_4.0,Number_of_doors_3.0,Number_of_doors_2.0,Number_of_doors_5.0,Number_of_doors_6.0
0,28000.0,Osoby prywatnej,Używane,Honda,Civic,2010,189347.0,Benzyna,140,Manualna,...,0,0,0,0,0,1,0,0,0,0
1,92900.0,Osoby prywatnej,Używane,Honda,Civic,2018,81240.0,Benzyna,182,Manualna,...,0,0,1,0,0,0,0,0,1,0
2,89000.0,Osoby prywatnej,Używane,Honda,CR-V,2016,86500.0,Benzyna,155,Manualna,...,0,0,0,0,0,0,0,0,1,0
3,48900.0,Osoby prywatnej,Używane,Honda,CR-V,2010,218350.0,Benzyna+LPG,150,Manualna,...,0,0,0,0,0,0,0,0,1,0
4,7500.0,Osoby prywatnej,Używane,Honda,Civic,2001,245000.0,Benzyna,90,Manualna,...,0,0,1,0,0,0,0,0,1,0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
33192,259000.0,Firmy,Używane,Alpine,A110,2018,3000.0,Benzyna,252,Automatyczna,...,1,0,0,0,0,0,0,1,0,0
33193,316900.0,Firmy,Nowe,Alpine,A110,2023,1.0,Benzyna,252,Automatyczna,...,1,0,0,0,0,0,0,1,0,0
33194,340700.0,Firmy,Nowe,Alpine,A110,2023,1.0,Benzyna,252,Automatyczna,...,1,0,0,0,0,0,0,1,0,0
33195,359283.0,Firmy,Używane,Alpine,A110,2021,6200.0,Benzyna,292,Automatyczna,...,1,0,0,0,0,0,0,1,0,0


In [30]:
df_pandas.isnull().sum()

Price                  0
Offer from             0
Condition              0
Vehicle brand          0
Vehicle model          0
                      ..
Number_of_doors_4.0    0
Number_of_doors_3.0    0
Number_of_doors_2.0    0
Number_of_doors_5.0    0
Number_of_doors_6.0    0
Length: 562, dtype: int64