In [1]:
from google.cloud import bigquery
from google.oauth2 import service_account

import findspark

In [2]:
credentials = service_account.Credentials.from_service_account_file(r"C:\Users\Chase\Downloads\used-car-summer-2023-project-b4807c4731d7.json")

bigquery_client = bigquery.Client(credentials=credentials, project='used-car-summer-2023-project')


query = bigquery_client.query('SELECT * FROM `training_data.raw_listings`').result()

staged_listings = query.to_dataframe()

In [3]:
findspark.init()
findspark.find()

'C:\\Users\\Chase\\OneDrive\\Documents\\spark-3.2.2-bin-hadoop3.2'

In [4]:
from pyspark.sql import SparkSession, Row
from pyspark.sql.types import *
from pyspark.sql.functions import *

spark = (
            SparkSession
            .builder
            .appName("UsedCarDataWranglingApp")
            .master("local[4]")
            .config("spark.dynamicAllocation.enabled", "false")
            .config("spark.sql.adaptive.enabled", "false")
            .getOrCreate()
)

sc = spark.sparkContext

spark

In [5]:
from IPython.display import *
display(HTML("<style> pre { white-space: pre !important; }</style>"))

### Step 1a: View Current Data

In [6]:
staged_listings = (
                    spark
                    .createDataFrame
                    (staged_listings,
                     "page_num: integer, vin: string, header: string, trim: string, price: string, mileage: string, location: string, colors: string, condition: string")
)

#staged_listings.show()

### Step 1b: Show Current Schema

In [7]:
staged_listings.printSchema()

root
 |-- page_num: integer (nullable = true)
 |-- vin: string (nullable = true)
 |-- header: string (nullable = true)
 |-- trim: string (nullable = true)
 |-- price: string (nullable = true)
 |-- mileage: string (nullable = true)
 |-- location: string (nullable = true)
 |-- colors: string (nullable = true)
 |-- condition: string (nullable = true)



### Step 2: Drop Duplicate Values

In [8]:
staged_listings = (
                    staged_listings
                    .dropDuplicates(subset=['vin', 'header', 'mileage'])
)

### Step 3: Remove Unnecessary Columns

In [9]:
staged_listings = (
                    staged_listings
                    .drop('page_num', 'vin')
)

staged_listings.printSchema()

root
 |-- header: string (nullable = true)
 |-- trim: string (nullable = true)
 |-- price: string (nullable = true)
 |-- mileage: string (nullable = true)
 |-- location: string (nullable = true)
 |-- colors: string (nullable = true)
 |-- condition: string (nullable = true)



### Step 4: Remove All Listings with Troublesome Headers

In [10]:
staged_listings = (
                    staged_listings
                    .filter(~col('header').contains('Land'))
                    .filter(~col('header').contains('Alfa'))
                    .filter(~col('header').contains('Aston'))
)

### Step 5: Split "header" Column into "year", "make", and "model" Columns

In [11]:

staged_listings = (
                    staged_listings
                    .withColumn('year', split(staged_listings['header'], ' ').getItem(0))
                    .withColumn('make', split(staged_listings['header'], ' ').getItem(1))
                    .withColumn('model', split(staged_listings['header'], ' ').getItem(2))
)

staged_listings = (
                    staged_listings
                    .withColumn('year', regexp_replace('year', 'Sponsored', ''))
)

### Step 6: Clean "price" Column

In [12]:
staged_listings = (
                    staged_listings
                    .filter(~col('price').contains('Not'))
)

staged_listings = (
                    staged_listings
                    .withColumn('price', split(staged_listings['price'], '\$').getItem(1))
                    .withColumn('price', regexp_replace('price', '[^a-zA-Z0-9]', ''))
                    .withColumn('price', col('price').cast('integer'))
)

### Step 7: Clean "trim" Column

In [13]:
staged_listings = (
                    staged_listings
                    .withColumn('trim', regexp_replace('trim', 'AWD', ''))
                    .withColumn('trim', regexp_replace('trim', 'FWD', ''))
                    .withColumn('trim', regexp_replace('trim', 'RWD', ''))
                    .withColumn('trim', regexp_replace('trim', '4WD', ''))
                    .withColumn('trim', regexp_replace('trim', '2WD', ''))
)

### Step 8: Clean "mileage" Column

In [14]:
staged_listings = (
                    staged_listings
                    .withColumn('mileage', regexp_replace('mileage', ' miles', ''))
                    .withColumn('mileage', regexp_replace('mileage', '[^a-zA-Z0-9]', ''))
                    .withColumn('mileage', col('mileage').cast('integer'))
)

### Step 9: Split "colors" Column into "exterior_color" and "interior_color" Columns

In [15]:
staged_listings = (
                    staged_listings
                    .withColumn('exterior_color', split(staged_listings['colors'], ', ').getItem(0))
                    .withColumn('interior_color', split(staged_listings['colors'], ', ').getItem(1))
)

staged_listings = (
                    staged_listings
                    .withColumn('exterior_color', rtrim(staged_listings['exterior_color']))
                    .withColumn('interior_color', ltrim(staged_listings['interior_color']))
                    .withColumn('exterior_color', regexp_replace('exterior_color', ' exterior', ''))
                    .withColumn('interior_color', regexp_replace('interior_color', ' interior', ''))
)

### Step 10: Split "condition" Column into "num_accidents", "num_owners", and "usage_type"

In [16]:
staged_listings = (
                    staged_listings
                    .withColumn('num_accidents', split('condition', ', ').getItem(0))
                    .withColumn('num_owners', split('condition', ', ').getItem(1))
                    .withColumn('usage_type', split('condition', ', ').getItem(2))
)

staged_listings = (
                    staged_listings
                    .withColumn('num_accidents', regexp_replace('num_accidents', ' accidents', ''))
                    .withColumn('num_accidents', regexp_replace('num_accidents', ' accident', ''))
                    .withColumn('num_accidents', regexp_replace('num_accidents', 'No accidents reported', '0'))
                    .withColumn('num_accidents', regexp_replace('num_accidents', 'No reported', '0'))
                    .withColumn('num_accidents', regexp_replace('num_accidents', ' reported', ''))
                    .withColumn('num_owners', regexp_replace('num_owners', ' Owners', ''))
                    .withColumn('num_owners', regexp_replace('num_owners', ' Owner', ''))
                    .withColumn('usage_type', regexp_replace('usage_type', ' use', ''))
)

staged_listings = (
                    staged_listings
                    .filter(~col('num_accidents').contains('F'))
                    .filter(~col('num_owners').contains('P'))
)

staged_listings = (
                    staged_listings
                    .withColumn('num_accidents', col('num_accidents').cast('integer'))
                    .withColumn('num_owners', col('num_owners').cast('integer'))
)

### Step 11: Split "location" Column into "city" and "state" Columns

In [17]:
staged_listings = (
                    staged_listings
                    .withColumn('location', regexp_replace('location', '[0-9]', ''))
                    .withColumn('location', regexp_replace('location', ', mi - ', ''))
                    .withColumn('location', regexp_replace('location', ' mi - ', ''))
                    .withColumn('location', regexp_replace('location', 'Online RetailerDelivery Available to ', 'Online'))
)

staged_listings = (
                    staged_listings
                    .withColumn('city', split(staged_listings['location'], ', ').getItem(0))
                    .withColumn('state', split(staged_listings['location'], ', ').getItem(1))
)

### Step 12: Remove Surplus Columns and Reorder Columns

In [18]:
staged_listings = (
                    staged_listings
                    .drop('header', 'location', 'colors', 'condition')
)

staged_listings = (
                    staged_listings
                    .select(
                        "price",
                        "year",
                        "make",
                        "model",
                        "trim",
                        "mileage",
                        "exterior_color",
                        "interior_color",
                        "num_accidents",
                        "num_owners",
                        "usage_type",
                        "city",
                        "state"
                    )
)

### Step 13: Export to BigQuery Table

In [19]:
import pandas_gbq
wrangled_listings = staged_listings.toPandas()
pandas_gbq.to_gbq(wrangled_listings, 'used-car-summer-2023-project.training_data.wrangled_training_data', project_id='used-car-summer-2023-project', if_exists='replace')

100%|██████████| 1/1 [00:00<00:00, 1000.07it/s]
