
## Overview

This notebook shows you how to load data from JDBC databases using Spark SQL.

*For production, you should control the level of parallelism used to read data from the external database, using the parameters described in the documentation.*


### Step 1: Connection Information

This is a **Python** notebook so the default cell type is Python. However, you can use different languages by using the `%LANGUAGE` magic command. Python, Scala, SQL, and R are all supported.

First we'll define some variables to let us programmatically create these connections.

In [0]:
# Import necessary libraries
from pyspark.sql import SparkSession

# Initialize SparkSession (Databricks automatically initializes SparkSession for you)
spark = SparkSession.builder.appName("DatabricksPostgres").getOrCreate()

# JDBC connection properties
jdbc_url = "jdbc:postgresql://dbaddress:5432/postgres"  # Replace <port> with the actual port number (5432 is the default port for PostgreSQL)
jdbc_properties = {
    "user": "user",
    "password": "pass",
    "driver": "org.postgresql.Driver"
}

In [0]:
# Read data from PostgreSQL
df = spark.read.jdbc(url=jdbc_url, table='"Laptop"', properties=jdbc_properties)

df.show(5)

+--------------------+---+------+------+--------------------+--------------------+--------------------+-----------+-----------+--------------------+--------------------+---------------------+---------------+
|                 GPU|RAM| Brand| Price|            GPU_Type|           Condition|           Processor| Resolution|Screen_Size| Product_Description|     _airbyte_raw_id|_airbyte_extracted_at|  _airbyte_meta|
+--------------------+---+------+------+--------------------+--------------------+--------------------+-----------+-----------+--------------------+--------------------+---------------------+---------------+
|Intel HD Graphics...| 16|Lenovo|189.99|Integrated/On-Boa...|Very Good - Refur...|Intel Core i5 7th...|       null|         14|Lenovo ThinkPad 1...|56e7fb42-6fc6-498...|  2024-06-12 11:41:42|{"changes": []}|
|Intel UHD Graphic...| 64|  Dell|349.99|Integrated/On-Boa...|Very Good - Refur...|Intel Core i5 8th...|1920 x 1080|       15.6|Dell Latitude 15....|9c73207c-9323-44e...

In [0]:
# Print schema and show initial data
df.printSchema()

root
 |-- GPU: string (nullable = true)
 |-- RAM: string (nullable = true)
 |-- Brand: string (nullable = true)
 |-- Price: string (nullable = true)
 |-- GPU_Type: string (nullable = true)
 |-- Condition: string (nullable = true)
 |-- Processor: string (nullable = true)
 |-- Resolution: string (nullable = true)
 |-- Screen_Size: string (nullable = true)
 |-- Product_Description: string (nullable = true)
 |-- _airbyte_raw_id: string (nullable = true)
 |-- _airbyte_extracted_at: timestamp (nullable = true)
 |-- _airbyte_meta: string (nullable = true)



In [0]:
df=df.drop('_airbyte_raw_id','_airbyte_extracted_at','_airbyte_meta')
df.columns

Out[88]: ['GPU',
 'RAM',
 'Brand',
 'Price',
 'GPU_Type',
 'Condition',
 'Processor',
 'Resolution',
 'Screen_Size',
 'Product_Description']

Seek and rectify the irregularities in data that might cause problems in future

In [0]:
#EDA question why is RAM and price string instead of numerical?
df.select('RAM').distinct().show()

+----+
| RAM|
+----+
|  64|
| 512|
| 8gb|
|   8|
|  16|
|upto|
|64gb|
|16gb|
|  Up|
|16GB|
|  40|
| 8GB|
|  up|
|8GB,|
|  24|
|  32|
|  20|
|   4|
|32GB|
|  12|
+----+
only showing top 20 rows



In [0]:
df.filter("LOWER(RAM) LIKE '%up%'").show()
df.filter("LOWER(RAM) LIKE '%up%'").count()

+--------------------+----+------+-----+--------------------+---------+--------------------+-----------+-----------+--------------------+
|                 GPU| RAM| Brand|Price|            GPU_Type|Condition|           Processor| Resolution|Screen_Size| Product_Description|
+--------------------+----+------+-----+--------------------+---------+--------------------+-----------+-----------+--------------------+
|Intel Iris Xe Gra...|  Up|Lenovo|  459|Integrated/On-Boa...|      New|Intel Core i5 11t...|1920 x 1080|       15.6|2024 Lenovo Ideap...|
|Intel Iris Xe Gra...|  up|    HP|  599|Integrated/On-Boa...|      New|Intel Core i7 13t...|1920 x 1080|       15.6|*NEW* HP 15.6" Fu...|
|  Intel UHD Graphics|  up|  Asus|  419|Integrated/On-Boa...|      New|Intel Core i3 10t...|1920 x 1080|       15.6|Asus VivoBook 15 ...|
|                null|  up|  Dell|  679|Integrated/On-Boa...|      New|Intel Core i3 8th...| 1366 x 768|       15.6|2020 Dell 15.6" T...|
|Intel Iris Xe Gra...|upto|    HP|

In [0]:
#dropping the rows ie updating df without them
df = df.filter("LOWER(RAM) NOT LIKE '%up%'")

In [0]:
df.count() #confirmed deletion of 5 entries

Out[92]: 2947

Cleaning by converting string to int

In [0]:

from pyspark.sql.functions import regexp_extract, col

# Remove everything except numbers from RAM and convert to integer
df = df.withColumn("RAM", regexp_extract(col("RAM"), "\\d+", 0).cast("int"))

# Convert Price column to float
df = df.withColumn("Price", col("Price").cast("float"))

# Convert Price column to float
df = df.withColumn("Screen_Size", col("Screen_Size").cast("float"))

In [0]:
df.groupBy('Resolution').count().show()

+--------------------+-----+
|          Resolution|count|
+--------------------+-----+
|          1600 x 900|   16|
|           2496x1664|    1|
|         1536 x 1204|    1|
|         3200 x 1800|    1|
|         2240 x 1400|   11|
|          1366 x 786|    2|
|          1600 x 768|    1|
|1920 x 1080 (Full...|    4|
|         2560 x 1600|   67|
|                 FHD|    1|
|         2160 x 1350|    1|
|         2560 x 1440|   17|
|    FHD+ 1920 x 1200|    1|
|                null|  387|
|         2880 x 1800|   17|
|            1366x768|   17|
|15.6" UltraSharp ...|    1|
|         1900 x 1200|   10|
|         2736 x 1824|   19|
|           2560x1440|    1|
+--------------------+-----+
only showing top 20 rows



In [0]:
from pyspark.sql.functions import when, col

# Handle None values and create Touchscreen column
df = df.withColumn('Touchscreen', 
                   when((col('Product_Description').isNotNull()) & (col('Product_Description').contains('Touchscreen')), 1)
                   .otherwise(0))

# Display a sample of the DataFrame
df.sample(False, 0.1).show(5)  # Adjust the sampling fraction and number of rows as needed


+--------------------+---+------+------+--------------------+--------------------+--------------------+-----------+-----------+--------------------+-----------+
|                 GPU|RAM| Brand| Price|            GPU_Type|           Condition|           Processor| Resolution|Screen_Size| Product_Description|Touchscreen|
+--------------------+---+------+------+--------------------+--------------------+--------------------+-----------+-----------+--------------------+-----------+
| NVIDIA Quadro M2200| 16|Lenovo|314.96|  Dedicated Graphics|Very Good - Refur...|Intel Core i7 7th...|1920 x 1080|       15.6|Lenovo ThinkPad P...|          0|
|Intel Iris Xe Gra...|  8|Lenovo|247.45|Integrated/On-Boa...|Very Good - Refur...|Intel Core i5 11t...|1920 x 1080|       13.3|Lenovo ThinkPad L...|          0|
|Intel UHD Graphic...| 64|  Dell| 322.0|Integrated/On-Boa...|Very Good - Refur...|Intel Core i5 8th...|1920 x 1080|       15.6|Dell 5500 15.6" T...|          0|
|                null|  8|  Dell|1

In [0]:
df.groupBy('Touchscreen').count().show()

+-----------+-----+
|Touchscreen|count|
+-----------+-----+
|          1|  119|
|          0| 2828|
+-----------+-----+



In [0]:
# Handle None values and create Ips column
df = df.withColumn('Ips', 
                   when((col('Product_Description').isNotNull()) & (col('Product_Description').contains('IPS')), 1)
                   .otherwise(0))

# Display a sample of the DataFrame
df.sample(False, 0.1).show(5)  # Adjust the sampling fraction and number of rows as needed


+--------------------+---+------+------+--------------------+--------------------+--------------------+-----------+-----------+--------------------+-----------+---+
|                 GPU|RAM| Brand| Price|            GPU_Type|           Condition|           Processor| Resolution|Screen_Size| Product_Description|Touchscreen|Ips|
+--------------------+---+------+------+--------------------+--------------------+--------------------+-----------+-----------+--------------------+-----------+---+
|Intel UHD Graphic...| 64|  Dell|349.99|Integrated/On-Boa...|Very Good - Refur...|Intel Core i5 8th...|1920 x 1080|       15.6|Dell Latitude 15....|          0|  0|
|   Intel HD Graphics|  8|    HP|119.99|  Dedicated Graphics|            Open box|Intel Core i5 5th...| 1366 x 768|       11.6|HP Elitebook Revo...|          0|  0|
|Intel UHD Graphic...| 64|  Dell| 446.0|  Dedicated Graphics|Very Good - Refur...|Intel Core i7 8th...|1920 x 1080|       15.6|Dell 15.6" NVIDIA...|          0|  0|
|  Intel U

In [0]:
df.groupBy('Ips').count().show()

+---+-----+
|Ips|count|
+---+-----+
|  1|   45|
|  0| 2902|
+---+-----+



In [0]:
df.groupBy('Screen_Size').count().show()

+-----------+-----+
|Screen_Size|count|
+-----------+-----+
|       18.0|    1|
|       15.3|    1|
|       17.0|   31|
|       17.3|   89|
|       14.5|    6|
|       14.1|   19|
|       14.0| 1096|
|       12.5|   49|
|       10.1|    3|
|       10.0|    1|
|       12.0|   11|
|       15.4|    1|
|       12.3|   32|
|       13.3|  235|
|       14.4|    6|
|       16.0|  105|
|       15.5|    1|
|      15.55|    2|
|       12.4|   59|
|       11.6|    7|
+-----------+-----+
only showing top 20 rows



In [0]:
from pyspark.sql.functions import isnull
df.filter(isnull(col('Screen_Size'))).show()

+---+---+-----+-----+--------+---------+---------+----------+-----------+-------------------+-----------+---+
|GPU|RAM|Brand|Price|GPU_Type|Condition|Processor|Resolution|Screen_Size|Product_Description|Touchscreen|Ips|
+---+---+-----+-----+--------+---------+---------+----------+-----------+-------------------+-----------+---+
+---+---+-----+-----+--------+---------+---------+----------+-----------+-------------------+-----------+---+



In [0]:
df.groupBy('Resolution').count().show()

+--------------------+-----+
|          Resolution|count|
+--------------------+-----+
|          1600 x 900|   16|
|           2496x1664|    1|
|         1536 x 1204|    1|
|         3200 x 1800|    1|
|         2240 x 1400|   11|
|          1366 x 786|    2|
|          1600 x 768|    1|
|1920 x 1080 (Full...|    4|
|         2560 x 1600|   67|
|                 FHD|    1|
|         2160 x 1350|    1|
|         2560 x 1440|   17|
|    FHD+ 1920 x 1200|    1|
|                null|  387|
|         2880 x 1800|   17|
|            1366x768|   17|
|15.6" UltraSharp ...|    1|
|         1900 x 1200|   10|
|         2736 x 1824|   19|
|           2560x1440|    1|
+--------------------+-----+
only showing top 20 rows



In [0]:
from pyspark.sql.functions import split

# Assuming 'spark' is your SparkSession object and 'df' is your PySpark DataFrame

# Split Resolution column into X_res and Y_res
df = df.withColumn('Resolution', df['Resolution'].cast('string'))  # Ensure Resolution column is string type

# Split Resolution column into X_res and Y_res
df = df.withColumn('split_resolution', split(df['Resolution'], 'x'))

# Extract X_res and Y_res from split_resolution column
df = df.withColumn('X_res', df['split_resolution'].getItem(0))
df = df.withColumn('Y_res', df['split_resolution'].getItem(1))

# Drop intermediate split_resolution column if no longer needed
df = df.drop('split_resolution')

# Show sample of transformed DataFrame
df.show(5)

+--------------------+---+---------+------+--------------------+--------------------+--------------------+-----------+-----------+--------------------+-----------+---+-----+-----+
|                 GPU|RAM|    Brand| Price|            GPU_Type|           Condition|           Processor| Resolution|Screen_Size| Product_Description|Touchscreen|Ips|X_res|Y_res|
+--------------------+---+---------+------+--------------------+--------------------+--------------------+-----------+-----------+--------------------+-----------+---+-----+-----+
|Intel HD Graphics...| 16|   Lenovo|189.99|Integrated/On-Boa...|Very Good - Refur...|Intel Core i5 7th...|       null|       14.0|Lenovo ThinkPad 1...|          0|  0| null| null|
|Intel UHD Graphic...| 64|     Dell|349.99|Integrated/On-Boa...|Very Good - Refur...|Intel Core i5 8th...|1920 x 1080|       15.6|Dell Latitude 15....|          0|  0|1920 | 1080|
|Intel Iris Xe Gra...|  8|       HP|369.95|Integrated/On-Boa...|                 New|Intel Core i5 1

In [0]:
df.groupBy('X_res').count().show(truncate=False)

+------------------------------------+-----+
|X_res                               |count|
+------------------------------------+-----+
|3200                                |1    |
|15.6" UltraSharp FHD IPS Touch (1920|1    |
|1900                                |10   |
|1536                                |44   |
|1366                                |158  |
|1600                                |17   |
|2496                                |1    |
|FHD                                 |1    |
|1920                                |1934 |
|1536                                |4    |
|null                                |387  |
|2496                                |13   |
|3200                                |1    |
|Touchscreen                         |1    |
|1024                                |2    |
|3840X2400                           |1    |
|3072                                |7    |
|2256                                |1    |
|2160                                |8    |
|3840     

In [0]:
# List of abnormal values
abnormal_values = [
    '15.6', 
    'FHD (1920', 
    'Not Included', 
    '1080p', 
    '1920X1080', 
    'FHD+ 1920', 
    '5K', 
    'FHD', 
    '3840X2400',
    'FHD+ 1920 ',
    'FHD (1920 ', 
    'Touchscreen', 
    '15.6" UltraSharp FHD IPS Touch (1920'
]

# Replace abnormal values with '1920' and handle nulls
df = df.withColumn('X_res', when(df['X_res'].isin(abnormal_values), '1920').otherwise(df['X_res']))
df = df.na.fill({'X_res': '1920'})

# Show the DataFrame to verify changes
df.show()

+--------------------+---+---------+------+--------------------+--------------------+--------------------+-----------+-----------+--------------------+-----------+---+-----+-----+
|                 GPU|RAM|    Brand| Price|            GPU_Type|           Condition|           Processor| Resolution|Screen_Size| Product_Description|Touchscreen|Ips|X_res|Y_res|
+--------------------+---+---------+------+--------------------+--------------------+--------------------+-----------+-----------+--------------------+-----------+---+-----+-----+
|Intel HD Graphics...| 16|   Lenovo|189.99|Integrated/On-Boa...|Very Good - Refur...|Intel Core i5 7th...|       null|       14.0|Lenovo ThinkPad 1...|          0|  0| 1920| null|
|Intel UHD Graphic...| 64|     Dell|349.99|Integrated/On-Boa...|Very Good - Refur...|Intel Core i5 8th...|1920 x 1080|       15.6|Dell Latitude 15....|          0|  0|1920 | 1080|
|Intel Iris Xe Gra...|  8|       HP|369.95|Integrated/On-Boa...|                 New|Intel Core i5 1

In [0]:
unique_x_res = df.select('X_res').distinct().collect()

# Convert the result to a list
unique_x_res_list = [row['X_res'] for row in unique_x_res]

# Print the unique values
print(unique_x_res_list)

['3200', '1900 ', '1536 ', '1366 ', '1600 ', '2496', '1920 ', '1536', '2496 ', '3200 ', '1024 ', '3072 ', '2256', '2160 ', '3840', '2240', '2880 ', '2560', '2736 ', '2240 ', '3240 ', '2560 ', '3456 ', '2256 ', '1366', '2400 ', '3000 ', '1920', '3840 ']


In [0]:
df = df.withColumn('X_res', col('X_res').cast('int'))

In [0]:
df.groupBy('Y_res').count().show(truncate=False)

+--------+-----+
|Y_res   |count|
+--------+-----+
| 2400   |21   |
| 1620   |3    |
| 1080   |1662 |
| 768    |159  |
| 1280   |27   |
| 1024   |43   |
|1664    |1    |
|null    |397  |
| 1920   |22   |
| 1440   |24   |
| 1504   |59   |
| 1080 pi|1    |
|1080)   |1    |
| 2160   |51   |
| 1204   |1    |
| 2000   |3    |
| 1824   |19   |
|2160    |1    |
| 1600   |74   |
|1400    |1    |
+--------+-----+
only showing top 20 rows



In [0]:
abnormal_values = [
    ' 1080 (Full HD)', 
    '1080 Full HD', 
    ' 1080 pixels',
    ' 1080 Full HD',
    ' 1080)',
    ' 1080 pi', 
    '1080)'
]

# Replace abnormal values with '1920' and handle nulls
df = df.withColumn('Y_res', when(df['Y_res'].isin(abnormal_values), '1080').otherwise(df['Y_res']))
df = df.na.fill({'Y_res': '1080'})

# Show the DataFrame to verify changes
df.show()

+--------------------+---+---------+------+--------------------+--------------------+--------------------+-----------+-----------+--------------------+-----------+---+-----+-----+
|                 GPU|RAM|    Brand| Price|            GPU_Type|           Condition|           Processor| Resolution|Screen_Size| Product_Description|Touchscreen|Ips|X_res|Y_res|
+--------------------+---+---------+------+--------------------+--------------------+--------------------+-----------+-----------+--------------------+-----------+---+-----+-----+
|Intel HD Graphics...| 16|   Lenovo|189.99|Integrated/On-Boa...|Very Good - Refur...|Intel Core i5 7th...|       null|       14.0|Lenovo ThinkPad 1...|          0|  0| 1920| 1080|
|Intel UHD Graphic...| 64|     Dell|349.99|Integrated/On-Boa...|Very Good - Refur...|Intel Core i5 8th...|1920 x 1080|       15.6|Dell Latitude 15....|          0|  0|1920 | 1080|
|Intel Iris Xe Gra...|  8|       HP|369.95|Integrated/On-Boa...|                 New|Intel Core i5 1

In [0]:
unique_y_res = df.select('Y_res').distinct().collect()

# Convert the result to a list
unique_y_res_list = [row['Y_res'] for row in unique_y_res]

# Print the unique values
print(unique_y_res_list)

[' 2400', ' 1620', ' 1080', ' 768', ' 1280', ' 1024', '1664', ' 1920', ' 1440', ' 1504', ' 2160', ' 1204', ' 2000', ' 1824', '2160', ' 1600', '1400', '1504', '1080', ' 1200', ' 1800', ' 1350', '1024', '1600', '768', '1440', ' 900', ' 1400', ' 1664', '1800', ' 786']


In [0]:
df = df.withColumn('Y_res', col('Y_res').cast('int'))

In [0]:
from pyspark.sql.types import FloatType, DoubleType, IntegerType, LongType
columns = df.columns

# Calculate correlation with 'Price'
correlations = {}
for col in columns:
    if col != 'Price' and df.schema[col].dataType in (FloatType(), DoubleType(), IntegerType(), LongType()):
        correlation = df.stat.corr('Price', col)
        correlations[col] = correlation

# Print the correlation values
for col, corr in correlations.items():
    print(f"Corr 'Price' and '{col}': {corr}")

Corr 'Price' and 'RAM': 0.11908970720724017
Corr 'Price' and 'Screen_Size': 0.15560943440294497
Corr 'Price' and 'Touchscreen': 0.08163186068716557
Corr 'Price' and 'Ips': 0.008941628355080208
Corr 'Price' and 'Y_res': 0.25011589472431467


In [0]:
from pyspark.sql.functions import col, sqrt
df = df.withColumn('ppi', (sqrt(col('X_res')**2 + col('Y_res')**2) / col('Screen_Size')).cast('float'))

# Show a sample of the DataFrame
df.select('X_res', 'Y_res', 'Screen_Size', 'ppi').show()

+-----+-----+-----------+---------+
|X_res|Y_res|Screen_Size|      ppi|
+-----+-----+-----------+---------+
| 1920| 1080|       14.0|157.35051|
|1920 | 1080|       15.6|141.21199|
|1920 | 1080|       15.6|141.21199|
|1920 | 1080|       15.6|141.21199|
|2256 | 1504|       13.5|200.84256|
|1920 | 1080|       15.6|141.21199|
|1920 | 1080|       15.6|141.21199|
|2256 | 1504|       13.5|200.84256|
| 1920| 1080|       14.0|157.35051|
|1920 | 1080|       15.6|141.21199|
|1920 | 1080|       14.0|157.35051|
|1920 | 1080|       14.0|157.35051|
|1920 | 1200|       13.4|168.96683|
|1920 | 1080|       14.0|157.35051|
|1920 | 1080|       14.0|157.35051|
|1920 | 1080|       13.3|165.63211|
|1920 | 1080|       15.6|141.21199|
|1920 | 1080|       15.6|141.21199|
|1920 | 1080|       15.6|141.21199|
|1366 |  768|       14.0| 111.9352|
+-----+-----+-----------+---------+
only showing top 20 rows



In [0]:
columns = df.columns

# Calculate correlation with 'Price'
correlations = {}
for col in columns:
    if col != 'Price' and df.schema[col].dataType in (FloatType(), DoubleType(), IntegerType(), LongType()):
        correlation = df.stat.corr('Price', col)
        correlations[col] = correlation

# Print the correlation values
for col, corr in correlations.items():
    print(f"Corr 'Price' and '{col}': {corr}")

Corr 'Price' and 'RAM': 0.11908970720724017
Corr 'Price' and 'Screen_Size': 0.15560943440294497
Corr 'Price' and 'Touchscreen': 0.08163186068716557
Corr 'Price' and 'Ips': 0.008941628355080208
Corr 'Price' and 'Y_res': 0.25011589472431467
Corr 'Price' and 'ppi': 0.15288746622775373


In [0]:
# Drop the columns from the PySpark DataFrame
df = df.drop('Screen_Size', 'X_res', 'Y_res', 'Resolution')

# Show the DataFrame to confirm the columns have been dropped
df.show()

+--------------------+---+---------+------+--------------------+--------------------+--------------------+--------------------+-----------+---+---------+
|                 GPU|RAM|    Brand| Price|            GPU_Type|           Condition|           Processor| Product_Description|Touchscreen|Ips|      ppi|
+--------------------+---+---------+------+--------------------+--------------------+--------------------+--------------------+-----------+---+---------+
|Intel HD Graphics...| 16|   Lenovo|189.99|Integrated/On-Boa...|Very Good - Refur...|Intel Core i5 7th...|Lenovo ThinkPad 1...|          0|  0|157.35051|
|Intel UHD Graphic...| 64|     Dell|349.99|Integrated/On-Boa...|Very Good - Refur...|Intel Core i5 8th...|Dell Latitude 15....|          0|  0|141.21199|
|Intel Iris Xe Gra...|  8|       HP|369.95|Integrated/On-Boa...|                 New|Intel Core i5 11t...|NEW HP 15 Laptop....|          0|  0|141.21199|
|Intel Iris Xe Gra...| 16|   Lenovo| 499.0|Integrated/On-Boa...|            

In [0]:
unique_processor = df.select('Processor').distinct().collect()

# Convert the result to a list
unique_processor = [row['Processor'] for row in unique_processor]

# Print the unique values
print(unique_processor)

['AMD Ryzen 7 5000 Series', 'Intel Core i3 5th Gen.', 'Intel Core i7 8th Gen.', 'Intel Core i3 6th Gen.', 'Intel Core i7 13th Gen.', 'Intel Core i7 12th Gen.', 'Intel Core i3 12th Gen.', 'Intel Core i7 11th Gen.', 'Intel Core i3 4th Gen.', 'Intel Core i5 11th Gen.', 'Intel Core i7 7th Gen.', 'Intel Core i9 10th Gen.', 'AMD Ryzen 7 6000 Series', 'Intel Core i3 10th Gen.', 'AMD Ryzen 5 5000 Series', 'Intel Core i3 11th Gen.', 'AMD Ryzen 5 6000 Series', 'Intel Core i5 12th Gen.', 'Intel Core i9 12th Gen.', 'AMD Ryzen 9 6000 Series', 'Intel Core i3 13th Gen.', 'Intel Core i7 4th Gen.', 'Intel Core i9 8th Gen.', 'AMD Ryzen 7 4000 Series', 'AMD Ryzen 5 3000 Series', 'Intel Core i5 4th Gen.', 'Intel Core i7 9th Gen.', 'Intel Core i5 8th Gen.', 'Intel Core i7 5th Gen.', 'Intel Core i5 7th Gen.', 'Intel Core i7 6th Gen.', 'Intel Core i9 13th Gen.', 'AMD Ryzen 5 7000 Series', 'Intel Core i7 8th Gen. vPRO', 'Intel Core i5 5th Gen.', 'Intel Core i5 6th Gen.', 'AMD Ryzen 7 7000 Series', 'Intel Core

In [0]:
from pyspark.sql.functions import split, concat_ws, array

# Split the 'Processor' column into an array of words and then take the first three words
df = df.withColumn('Cpu Name', concat_ws(" ", array([split(df['Processor'], ' ')[i] for i in range(3)])))

# Show the resulting DataFrame
df.show()


+--------------------+---+---------+------+--------------------+--------------------+--------------------+--------------------+-----------+---+---------+-------------+
|                 GPU|RAM|    Brand| Price|            GPU_Type|           Condition|           Processor| Product_Description|Touchscreen|Ips|      ppi|     Cpu Name|
+--------------------+---+---------+------+--------------------+--------------------+--------------------+--------------------+-----------+---+---------+-------------+
|Intel HD Graphics...| 16|   Lenovo|189.99|Integrated/On-Boa...|Very Good - Refur...|Intel Core i5 7th...|Lenovo ThinkPad 1...|          0|  0|157.35051|Intel Core i5|
|Intel UHD Graphic...| 64|     Dell|349.99|Integrated/On-Boa...|Very Good - Refur...|Intel Core i5 8th...|Dell Latitude 15....|          0|  0|141.21199|Intel Core i5|
|Intel Iris Xe Gra...|  8|       HP|369.95|Integrated/On-Boa...|                 New|Intel Core i5 11t...|NEW HP 15 Laptop....|          0|  0|141.21199|Intel C

In [0]:
from pyspark.sql.types import StringType

def fetch_processor(text):
    if text in ['Intel Core i7', 'Intel Core i5', 'Intel Core i3']:
        return text
    else:
        if text.split()[0] == 'Intel':
            return 'Other Intel Processor'
        else:
            return 'AMD Processor'

# Register the function as a UDF
fetch_processor_udf = udf(fetch_processor, StringType())

In [0]:
# Apply the UDF to create the 'Cpu brand' column
df = df.withColumn('Cpu brand', fetch_processor_udf(df['Cpu Name']))

# Show the resulting DataFrame
df.show()

+--------------------+---+---------+------+--------------------+--------------------+--------------------+--------------------+-----------+---+---------+-------------+-------------+
|                 GPU|RAM|    Brand| Price|            GPU_Type|           Condition|           Processor| Product_Description|Touchscreen|Ips|      ppi|     Cpu Name|    Cpu brand|
+--------------------+---+---------+------+--------------------+--------------------+--------------------+--------------------+-----------+---+---------+-------------+-------------+
|Intel HD Graphics...| 16|   Lenovo|189.99|Integrated/On-Boa...|Very Good - Refur...|Intel Core i5 7th...|Lenovo ThinkPad 1...|          0|  0|157.35051|Intel Core i5|Intel Core i5|
|Intel UHD Graphic...| 64|     Dell|349.99|Integrated/On-Boa...|Very Good - Refur...|Intel Core i5 8th...|Dell Latitude 15....|          0|  0|141.21199|Intel Core i5|Intel Core i5|
|Intel Iris Xe Gra...|  8|       HP|369.95|Integrated/On-Boa...|                 New|Intel

In [0]:
df.groupBy('Cpu brand').count().show(truncate=False)

+---------------------+-----+
|Cpu brand            |count|
+---------------------+-----+
|Intel Core i3        |283  |
|Other Intel Processor|21   |
|Intel Core i5        |1384 |
|Intel Core i7        |1200 |
|AMD Processor        |59   |
+---------------------+-----+



In [0]:
# Drop the columns from the PySpark DataFrame
df = df.drop('Processor','Cpu name')

# Show the DataFrame to confirm the columns have been dropped
df.show()

+--------------------+---+---------+------+--------------------+--------------------+--------------------+-----------+---+---------+-------------+
|                 GPU|RAM|    Brand| Price|            GPU_Type|           Condition| Product_Description|Touchscreen|Ips|      ppi|    Cpu brand|
+--------------------+---+---------+------+--------------------+--------------------+--------------------+-----------+---+---------+-------------+
|Intel HD Graphics...| 16|   Lenovo|189.99|Integrated/On-Boa...|Very Good - Refur...|Lenovo ThinkPad 1...|          0|  0|157.35051|Intel Core i5|
|Intel UHD Graphic...| 64|     Dell|349.99|Integrated/On-Boa...|Very Good - Refur...|Dell Latitude 15....|          0|  0|141.21199|Intel Core i5|
|Intel Iris Xe Gra...|  8|       HP|369.95|Integrated/On-Boa...|                 New|NEW HP 15 Laptop....|          0|  0|141.21199|Intel Core i5|
|Intel Iris Xe Gra...| 16|   Lenovo| 499.0|Integrated/On-Boa...|                 New|2024 Lenovo Ideap...|          0|

In [0]:
from pyspark.sql.functions import regexp_extract, col, when, max as max_
from pyspark.sql.types import IntegerType

# Define regular expression for matching storage sizes and types
size_pattern = r"(\d+)(TB|GB)\s(HDD|SSD)"

# Extract sizes and types
df = df.withColumn("size", regexp_extract(col("Product_Description"), size_pattern, 1).cast(IntegerType()))
df = df.withColumn("unit", regexp_extract(col("Product_Description"), size_pattern, 2))
df = df.withColumn("type", regexp_extract(col("Product_Description"), size_pattern, 3))

# Convert TB to GB
df = df.withColumn("size_gb", when(col("unit") == "TB", col("size") * 1024).otherwise(col("size")))

# Create separate columns for HDD and SSD
df = df.withColumn("HDD_GB_temp", when(col("type") == "HDD", col("size_gb")).otherwise(0))
df = df.withColumn("SSD_GB_temp", when(col("type") == "SSD", col("size_gb")).otherwise(0))

# Aggregate to get total HDD and SSD sizes if there are multiple entries per row
aggregated_df = df.groupBy("Product_Description").agg(
    max_("HDD_GB_temp").alias("HDD_GB"),
    max_("SSD_GB_temp").alias("SSD_GB")
)

# Join the aggregated results back to the original dataframe
df = df.join(aggregated_df, on="Product_Description", how="left")

# Drop temporary columns
df = df.drop("size", "unit", "type", "size_gb", "HDD_GB_temp", "SSD_GB_temp")

# Show the result
df.show(truncate=False)


+--------------------------------------------------------------------------------+-----------------------+---+---------+------+----------------------------+-----------------------+-----------+---+---------+-------------+------+------+
|Product_Description                                                             |GPU                    |RAM|Brand    |Price |GPU_Type                    |Condition              |Touchscreen|Ips|ppi      |Cpu brand    |HDD_GB|SSD_GB|
+--------------------------------------------------------------------------------+-----------------------+---+---------+------+----------------------------+-----------------------+-----------+---+---------+-------------+------+------+
|Lenovo ThinkPad L13 Gen 2 13.3" Laptop i5 11th Gen 256GB SSD 8GB RAM Win 10 (OC)|Intel Iris Xe Graphics |8  |Lenovo   |247.45|Integrated/On-Board Graphics|Very Good - Refurbished|0          |0  |165.63211|Intel Core i5|0     |256   |
|Dell Latitude 5440 14" Intel Core I5-1335U 16GB RAM 512GB S

In [0]:
# Add 512 in SSD_GB where both HDD_GB and SSD_GB are 0
df = df.withColumn("SSD_GB", when((col("HDD_GB") == 0) & (col("SSD_GB") == 0), 512).otherwise(col("SSD_GB")))

# Show the updated DataFrame
df.show(truncate=False)

+--------------------------------------------------------------------------------+-----------------------+---+---------+------+----------------------------+-----------------------+-----------+---+---------+-------------+------+------+
|Product_Description                                                             |GPU                    |RAM|Brand    |Price |GPU_Type                    |Condition              |Touchscreen|Ips|ppi      |Cpu brand    |HDD_GB|SSD_GB|
+--------------------------------------------------------------------------------+-----------------------+---+---------+------+----------------------------+-----------------------+-----------+---+---------+-------------+------+------+
|Lenovo ThinkPad L13 Gen 2 13.3" Laptop i5 11th Gen 256GB SSD 8GB RAM Win 10 (OC)|Intel Iris Xe Graphics |8  |Lenovo   |247.45|Integrated/On-Board Graphics|Very Good - Refurbished|0          |0  |165.63211|Intel Core i5|0     |256   |
|Dell Latitude 5440 14" Intel Core I5-1335U 16GB RAM 512GB S

#ML

In [0]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, StandardScaler
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# Assuming df is your DataFrame with necessary columns

# Define categorical columns
cat_cols = ['GPU', 'Brand', 'GPU_Type', 'Condition', 'Cpu brand']

# Define numeric columns
num_cols = ['RAM', 'HDD_GB', 'SSD_GB', 'Touchscreen', 'Ips', 'ppi']

In [0]:
# StringIndexer and OneHotEncoder for categorical columns
indexers = [StringIndexer(inputCol=col, outputCol=f"{col}_index", handleInvalid="keep") for col in cat_cols]
encoders = [OneHotEncoder(inputCol=f"{col}_index", outputCol=f"{col}_encoded") for col in cat_cols]

In [0]:
# Assemble all features into a vector
assembler = VectorAssembler(inputCols=[f"{col}_encoded" for col in cat_cols] + num_cols, outputCol="features")

# Define the model
model = RandomForestRegressor(featuresCol="features", labelCol="Price", seed=42)

# Create a pipeline
pipeline = Pipeline(stages=indexers + encoders + [assembler, model])

# Fit the pipeline to the data
pipeline_model = pipeline.fit(df)

# Split data into train and test sets
train_data, test_data = df.randomSplit([0.8, 0.2], seed=42)

In [0]:
# Make predictions
predictions = pipeline_model.transform(test_data)

# Evaluate the model
evaluator = RegressionEvaluator(labelCol="Price", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print(f"Root Mean Squared Error (RMSE): {rmse}")

Root Mean Squared Error (RMSE): 180.39727520493352


In [0]:
from pyspark.sql import functions as F
from pyspark.sql.types import DoubleType
from pyspark.ml.evaluation import RegressionEvaluator

# Assuming df is your DataFrame with 'Price' column

# Step 1: Calculate the mean price
mean_price = df.agg(F.mean('Price')).collect()[0][0]

# Step 2: Create predictions based on mean price
baseline_df = df.withColumn('Baseline_Prediction', F.lit(mean_price))

# Step 3: Evaluate the baseline model using RMSE
evaluator = RegressionEvaluator(labelCol='Price', predictionCol='Baseline_Prediction', metricName='rmse')
rmse_baseline = evaluator.evaluate(baseline_df)

print(f"Baseline Model RMSE: {rmse_baseline}")

Baseline Model RMSE: 235.7072632135551


In [0]:
#adding SL number to columns
from pyspark.sql.functions import monotonically_increasing_id
df_with_sl = df.withColumn("SL", monotonically_increasing_id())

# Reorder the columns to make SL the first column
columns = ["SL"] + [col for col in df_with_sl.columns if col != "SL"]
df_with_sl = df_with_sl.select(columns)

df_with_sl.show()

+---+--------------------+---+---------+------+--------------------+--------------------+--------------------+-----------+---+---------+-------------+---+----+
| SL|                 GPU|RAM|    Brand| Price|            GPU_Type|           Condition| Product_Description|Touchscreen|Ips|      ppi|    Cpu brand|HDD| SSD|
+---+--------------------+---+---------+------+--------------------+--------------------+--------------------+-----------+---+---------+-------------+---+----+
|  0|Intel HD Graphics...| 16|   Lenovo|189.99|Integrated/On-Boa...|Very Good - Refur...|Lenovo ThinkPad 1...|          0|  0|157.35051|Intel Core i5|  0| 256|
|  1|Intel UHD Graphic...| 64|     Dell|349.99|Integrated/On-Boa...|Very Good - Refur...|Dell Latitude 15....|          0|  0|141.21199|Intel Core i5|  0|2048|
|  2|Intel Iris Xe Gra...|  8|       HP|369.95|Integrated/On-Boa...|                 New|NEW HP 15 Laptop....|          0|  0|141.21199|Intel Core i5|  0| 256|
|  3|Intel Iris Xe Gra...| 16|   Lenovo|

In [0]:
df_with_sl=df_with_sl.withColumnRenamed('SL','SL No') #renaming columns
df_with_sl.show()

+-----+--------------------+---+---------+------+--------------------+--------------------+--------------------+-----------+---+---------+-------------+---+----+
|SL No|                 GPU|RAM|    Brand| Price|            GPU_Type|           Condition| Product_Description|Touchscreen|Ips|      ppi|    Cpu brand|HDD| SSD|
+-----+--------------------+---+---------+------+--------------------+--------------------+--------------------+-----------+---+---------+-------------+---+----+
|    0|Intel HD Graphics...| 16|   Lenovo|189.99|Integrated/On-Boa...|Very Good - Refur...|Lenovo ThinkPad 1...|          0|  0|157.35051|Intel Core i5|  0| 256|
|    1|Intel UHD Graphic...| 64|     Dell|349.99|Integrated/On-Boa...|Very Good - Refur...|Dell Latitude 15....|          0|  0|141.21199|Intel Core i5|  0|2048|
|    2|Intel Iris Xe Gra...|  8|       HP|369.95|Integrated/On-Boa...|                 New|NEW HP 15 Laptop....|          0|  0|141.21199|Intel Core i5|  0| 256|
|    3|Intel Iris Xe Gra...|

#Filter ops
Used to display rows WRT condition specified

In [0]:
overprices_laptops=df.filter('price>800 and RAM<8') #inside quotes write sql expression
overprices_laptops.show()
overprices_laptops.count()


+---+---+-----+-----+--------+---------+-------------------+-----------+---+---+---------+---+---+
|GPU|RAM|Brand|Price|GPU_Type|Condition|Product_Description|Touchscreen|Ips|ppi|Cpu brand|HDD|SSD|
+---+---+-----+-----+--------+---------+-------------------+-----------+---+---+---------+---+---+
+---+---+-----+-----+--------+---------+-------------------+-----------+---+---+---------+---+---+

Out[327]: 0

#Selection ops
Used for displaying specific columns
add condition with filter

In [0]:

# Example: Select specific columns and filter rows
df.select('Price', 'Product_Description', 'RAM').filter('Price>500 and RAM >64').show()
#display number of rows 
df.select("Price", "Product_Description","RAM").filter('Price>500 and RAM >64').count()


+------+--------------------+---+
| Price| Product_Description|RAM|
+------+--------------------+---+
| 650.0|ASUS Vivobook 15 ...|512|
|769.89|NEW Samsung Galax...|256|
+------+--------------------+---+

Out[328]: 2

#Group By function

In [0]:
#mean/sum/max
df.groupBy('RAM').sum().show()

+---+--------+------------------+----------------+--------+------------------+--------+--------+
|RAM|sum(RAM)|        sum(Price)|sum(Touchscreen)|sum(Ips)|          sum(ppi)|sum(HDD)|sum(SSD)|
+---+--------+------------------+----------------+--------+------------------+--------+--------+
| 12|      72| 2591.979995727539|               1|       1| 859.4726104736328|       0|    1280|
|512|     512|             650.0|               0|       0| 141.2119903564453|       0|       0|
| 16|   21232| 761487.9652309418|              57|      21|216936.88683319092|    3060|  424258|
| 40|      80|  929.989990234375|               0|       1| 314.7010192871094|       0|       0|
| 20|      40| 956.6499938964844|               0|       0|   257.80517578125|       0|    1024|
| 64|    3072|25270.249786376953|               3|       1| 7183.355117797852|       0|   45570|
|  4|     340|27389.969888687134|               4|       2|12760.443382263184|    1524|    7296|
|  8|    9592| 547957.20724701

In [0]:
df.groupBy('RAM').mean().show()

+---+--------+------------------+--------------------+--------------------+------------------+------------------+------------------+
|RAM|avg(RAM)|        avg(Price)|    avg(Touchscreen)|            avg(Ips)|          avg(ppi)|          avg(HDD)|          avg(SSD)|
+---+--------+------------------+--------------------+--------------------+------------------+------------------+------------------+
| 12|    12.0|431.99666595458984| 0.16666666666666666| 0.16666666666666666| 143.2454350789388|               0.0|213.33333333333334|
|512|   512.0|             650.0|                 0.0|                 0.0| 141.2119903564453|               0.0|               0.0|
| 16|    16.0| 573.8417221031965| 0.04295403165033911|0.015825169555388093|163.47919128348977|2.3059532780708363| 319.7121326299925|
| 40|    40.0| 464.9949951171875|                 0.0|                 0.5| 157.3505096435547|               0.0|               0.0|
| 20|    20.0| 478.3249969482422|                 0.0|               

Push transformed df

In [0]:
# Define PostgreSQL connection properties
postgres_url = "jdbc:postgresql://localhost:5432/my_database"
properties = {
    "user": "my_username",
    "password": "my_password",
    "driver": "org.postgresql.Driver"
}

# Write DataFrame to PostgreSQL
df.write \
    .jdbc(url=postgres_url, table="transformed_data", mode="overwrite", properties=properties)