In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf, sum, isnull
from pyspark.sql.types import StringType, ArrayType
import string
import re

# Google Maps metadata

In [None]:
# Create a SparkSession
spark = SparkSession.builder.getOrCreate()

gmaps_metadata_path = "gs://bucket123321/gmap_clean_data/metadata/part-00000-71b2933c-23b9-4dd1-a76d-cf6229105dac-c000.json"
# Read the JSON files into Spark DataFrames
gmap_metadata = spark.read.json(gmaps_metadata_path)

In [3]:
# Drop rows with missing values in the 'address' and 'category' columns
gmap_metadata = gmap_metadata.dropna(subset=['address', 'category'])

abbreviations = [
    'AL', 'AK', 'AZ', 'AR', 'CA', 'NC', 'SC', 'CO', 'CT', 'ND', 'SD', 'DE',
    'FL', 'GA', 'HI', 'ID', 'IL', 'IN', 'IA', 'KS', 'KY', 'LA', 'ME', 'MD',
    'MA', 'MI', 'MN', 'MS', 'MO', 'MT', 'NE', 'NV', 'NJ', 'NY', 'NH', 'NM',
    'OH', 'OK', 'OR', 'PA', 'RI', 'TN', 'TX', 'UT', 'VT', 'VA', 'WV', 'WA',
    'WI', 'WY'
]

states = {
    'Alabama': 'AL',
    'Alaska': 'AK',
    'Arizona': 'AZ',
    'Arkansas': 'AR',
    'California': 'CA',
    'North Carolina': 'NC',
    'South Carolina': 'SC',
    'Colorado': 'CO',
    'Connecticut': 'CT',
    'North Dakota': 'ND',
    'South Dakota': 'SD',
    'Delaware': 'DE',
    'Florida': 'FL',
    'Georgia': 'GA',
    'Hawaii': 'HI',
    'Idaho': 'ID',
    'Illinois': 'IL',
    'Indiana': 'IN',
    'Iowa': 'IA',
    'Kansas': 'KS',
    'Kentucky': 'KY',
    'Louisiana': 'LA',
    'Maine': 'ME',
    'Maryland': 'MD',
    'Massachusetts': 'MA',
    'Michigan': 'MI',
    'Minnesota': 'MN',
    'Mississippi': 'MS',
    'Missouri': 'MO',
    'Montana': 'MT',
    'Nebraska': 'NE',
    'Nevada': 'NV',
    'New Jersey': 'NJ',
    'New York': 'NY',
    'New Hampshire': 'NH',
    'New Mexico': 'NM',
    'Ohio': 'OH',
    'Oklahoma': 'OK',
    'Oregon': 'OR',
    'Pennsylvania': 'PA',
    'Rhode Island': 'RI',
    'Tennessee': 'TN',
    'Texas': 'TX',
    'Utah': 'UT',
    'Vermont': 'VT',
    'Virginia': 'VA',
    'West Virginia': 'WV',
    'Washington': 'WA',
    'Wisconsin': 'WI',
    'Wyoming': 'WY'
}

def filter_state(text):
    # Remove special characters except letters and spaces
    text = re.sub(r'[^a-zA-Z\s]', '', text)

    # Look for state abbreviations or full state names in the address
    for abbr in abbreviations:
        if abbr in text:
            return abbr

    for state in states:
        if state in text:
            return states[state]

    return None

# Define a UDF (User-Defined Function) to apply the filter_state function to a column
filter_state_udf = udf(filter_state, StringType())

# Apply the filter_state UDF to the 'address' column and create a new column 'State' with the filtered state values
gmap_metadata = gmap_metadata.withColumn('State', filter_state_udf(col('address')))

# Drop rows where "State" is null
gmap_metadata = gmap_metadata.na.drop(subset=['State'])

# gmap_metadata.show()

In [4]:

word_filter = [
    'Weight Loss Centers', 'Eyelash salon', 'Barbershop', 'Nail salon', 'Tanning', 'Reflexology', 'Perfume store', 'Massage',
    'Massage Therapy', 'Spray Tanning', 'Beauty', 'Cosmetology Schools', 'Eyebrow Services', 'Hair Extensions', 'Skin care clinic',
    'Nail Technicians', 'Beauty school', 'Day spa', 'Cosmetics store', 'Rolfing', 'Hair Stylists', 'Permanent Makeup',
    'Tanning salon', 'Threading Services', 'Beauty product supplier', 'Beauty & Spas', 'Medical Spas', 'Beauty salon', 'Makeup artist',
    'Blow Dry/Out Services', 'Body Contouring', 'Saunas', 'Hair removal service', 'Hair salon', 'Tattoo and piercing shop',
    'Electrolysis hair removal service', 'Massage studio', 'Naturopathic/Holistic', 'Orthodontist', 'Spa', 'Makeup Artists',
    'Beauty products wholesaler', 'Tattoo', 'Skin Care', 'Halotherapy', 'Facial spa', 'Hair Removal', 'Eyelash Service',
    'Hair Salons', 'Permanent make-up clinic', 'Cosmetics & Beauty Supply', 'Eyelash service', 'Piercing', 'Cosmetic dentist',
    'Wig shop', 'Nail Salons', 'Waxing'
]

# Create a UDF to extract the first word from each array element
extract_first_word = udf(lambda array: array[0] if array else None, StringType())

# Extract the first word from the category array
gmap_metadata = gmap_metadata.withColumn("first_word", extract_first_word(gmap_metadata.category))

# Filter the rows based on the intersection of first_word and word_filter
filtered_metadata = gmap_metadata.filter(gmap_metadata.first_word.isin(word_filter))

# Show the filtered DataFrame
# filtered_metadata.show()

In [5]:
# Define the output JSON file path
output_json_path = 'gs://bucket123321/gmap_clean_data/metadata/filter'

# Repartition the DataFrame to a single partition
filtered_metadata = filtered_metadata.repartition(1)

# Write the DataFrame to a JSON file 
filtered_metadata.write.json(output_json_path)

                                                                                

In [6]:
# !bq load --autodetect --source_format=NEWLINE_DELIMITED_JSON conjunto_testing.filter_gmaps_metadata gs://bucket123321/gmap_clean_data/metadata/filter/*.json

# Filter reviews by beauty_business

In [22]:
# Create a SparkSession
spark = SparkSession.builder.getOrCreate()

beauty_business_path = "gs://bucket123321/beauty_business.csv"
gmaps_reviews_path = "gs://bucket123321/gmap_clean_data/reviews/part-00000-fabb6eff-2a57-425e-b58d-cc873c7335d7-c000.json"
yelp_reviews_path = "gs://bucket123321/review.json"

# Read the files into Spark DataFrames
beauty_business = spark.read.csv(beauty_business_path, header=True)
gmaps_reviews = spark.read.json(gmaps_reviews_path)
yelp_reviews = spark.read.json(yelp_reviews_path)

                                                                                

In [23]:
# Transform gmaps_reviews DataFrame
gmaps_reviews = gmaps_reviews.drop("user_id", "name")  # Remove columns
gmaps_reviews = gmaps_reviews.withColumnRenamed("gmap_id", "business_id")  # Rename column
gmaps_reviews = gmaps_reviews.withColumnRenamed("rating", "stars")  # Rename column
gmaps_reviews = gmaps_reviews.withColumnRenamed("formatted_time", "date")  # Rename column

# Transform yelp_reviews DataFrame
yelp_reviews = yelp_reviews.drop("cool", "funny", "user_id", "review_id", "useful")  # Remove columns

In [30]:
beauty_business.printSchema()
gmaps_reviews.printSchema()
yelp_reviews.printSchema()

root
 |-- avg_rating: string (nullable = true)
 |-- id: string (nullable = true)
 |-- latitude: string (nullable = true)
 |-- longitude: string (nullable = true)
 |-- name: string (nullable = true)
 |-- num_of_reviews: string (nullable = true)
 |-- state: string (nullable = true)
 |-- category: string (nullable = true)

root
 |-- date: string (nullable = true)
 |-- business_id: string (nullable = true)
 |-- stars: long (nullable = true)
 |-- text: string (nullable = true)

root
 |-- business_id: string (nullable = true)
 |-- date: string (nullable = true)
 |-- stars: double (nullable = true)
 |-- text: string (nullable = true)



In [25]:
# Union the DataFrames
business_reviews = gmaps_reviews.union(yelp_reviews)

In [31]:
beauty_business.printSchema()
business_reviews.printSchema()

root
 |-- avg_rating: string (nullable = true)
 |-- id: string (nullable = true)
 |-- latitude: string (nullable = true)
 |-- longitude: string (nullable = true)
 |-- name: string (nullable = true)
 |-- num_of_reviews: string (nullable = true)
 |-- state: string (nullable = true)
 |-- category: string (nullable = true)

root
 |-- date: string (nullable = true)
 |-- business_id: string (nullable = true)
 |-- stars: double (nullable = true)
 |-- text: string (nullable = true)



In [33]:
# Join the DataFrames based on business_id and id
business_beauty_reviews = business_reviews.join(beauty_business, business_reviews.business_id == beauty_business.id)

# Select the desired columns from the filtered DataFrame
business_beauty_reviews = business_beauty_reviews.select(business_reviews.date, business_reviews.business_id, business_reviews.stars, business_reviews.text)

# Show the filtered DataFrame
business_beauty_reviews.show()

                                                                                

+-------------------+--------------------+-----+--------------------+
|               date|         business_id|stars|                text|
+-------------------+--------------------+-----+--------------------+
|2017-03-12 20:12:45|0x8752616fa217e71...|  5.0|Very good custome...|
|2019-09-19 09:02:25|0x87bae8b297b4086...|  4.0|Thank you all i a...|
|2019-12-02 19:06:19|0x5490415d892e4a8...|  5.0|                null|
|2019-10-05 23:21:09|0x89b426e51b86442...|  3.0|                null|
|2019-09-25 05:26:51|0x89c25de0b1c19e2...|  5.0|                null|
|2019-01-10 02:44:10|0x80c2c4f11ce7a41...|  5.0|                null|
|2018-12-01 16:34:01|0x808576d09f32119...|  5.0|                null|
|2020-09-13 00:01:54|0x89c51a86164f354...|  5.0|Great salon. Neve...|
|2020-10-22 02:40:27|0x52b3253d922c14b...|  5.0|            The best|
|2018-11-16 12:50:36|0x8861625c8bfaec1...|  2.0|                null|
|2018-09-15 19:14:31|0x89c6fd41ef3aab7...|  5.0|I've been going t...|
|2019-02-22 20:50:25

In [34]:
business_beauty_reviews.count()

                                                                                

1905190

In [35]:
# Define the output JSON file path
output_json_path = 'gs://bucket123321/beauty/'

# Repartition the DataFrame to a single partition
business_beauty_reviews = business_beauty_reviews.repartition(1)

# Write the DataFrame to a JSON file 
business_beauty_reviews.write.json(output_json_path)

                                                                                

In [36]:
# !bq load --autodetect --source_format=NEWLINE_DELIMITED_JSON conjunto_testing.beauty_business_reviews gs://bucket123321/beauty/*.json

Waiting on bqjob_rcc5466dd089d64f_00000188c71cb86a_1 ... (14s) Current status: DONE   
