## Creating and Storing Dataset  

- We are planning to merge the reviews and metadata files from all the states.
- Then concatenate those files and create a single parquet file.  

The objective is to get a dataset that we can use through the entirety of our project.

In [None]:
from google.colab import auth
auth.authenticate_user()


In [None]:
from google.cloud import storage

project_id = "sharp-matter-449521-u2"
!gcloud config set project {project_id}

Updated property [core/project].


In [None]:
states = [
    "Alabama", "Alaska", "Arizona", "Arkansas", "California", "Colorado", "Connecticut", "Delaware", "Florida", "Georgia",
    "Hawaii", "Idaho", "Illinois", "Indiana", "Iowa", "Kansas", "Kentucky", "Louisiana", "Maine", "Maryland",
    "Massachusetts", "Michigan", "Minnesota", "Mississippi", "Missouri", "Montana", "Nebraska", "Nevada", "New_Hampshire", "New_Jersey",
    "New_Mexico", "New_York", "North_Carolina", "North_Dakota", "Ohio", "Oklahoma", "Oregon", "Pennsylvania", "Rhode_Island", "South_Carolina",
    "South_Dakota", "Tennessee", "Texas", "Utah", "Vermont", "Virginia", "Washington", "West_Virginia", "Wisconsin", "Wyoming"
]


In [None]:
from google.cloud import storage
from urllib.request import urlopen
# Initializing Google Cloud Storage Client
client = storage.Client()

bucket_name = "my-group-project-bucket-dat490"
bucket = client.get_bucket(bucket_name)
def download_file_fast(url, file_path):
    """Download the file from URL and save it locally."""
    print(f"Downloading {file_path} from {url}...")
    try:
        with urlopen(url) as resp:
            data = resp.read()  # Read entire binary content
            with open(file_path, "wb") as binfile:
                binfile.write(data)
        print(f"Download successful: {file_path}")
    except Exception as e:
        print(f"Error downloading file: {e}")
def upload_json_gz_to_gcs(url, state_name, file_type):
    """Download and upload .json.gz file to GCS."""
    file_path = f"{state_name}-{file_type}.json.gz"
    # Downloading the file
    download_file_fast(url, file_path)
    # Uploading the file directly to GCS
    try:
        blob = bucket.blob(file_path)
        blob.upload_from_filename(file_path)
        print(f"{file_path} uploaded successfully to GCS.")
    except Exception as e:
        print(f"Error uploading file to GCS: {e}")

for state in states:
  review_url = f"https://datarepo.eng.ucsd.edu/mcauley_group/gdrive/googlelocal/review-{state}_10.json.gz"
  metadata_url = f"https://datarepo.eng.ucsd.edu/mcauley_group/gdrive/googlelocal/meta-{state}.json.gz"
  upload_json_gz_to_gcs(review_url, state, "reviews")
  upload_json_gz_to_gcs(metadata_url, state, "metadata")

Downloading Alabama-reviews.json.gz from https://datarepo.eng.ucsd.edu/mcauley_group/gdrive/googlelocal/review-Alabama_10.json.gz...


KeyboardInterrupt: 

In [None]:
from google.cloud import storage

# Initializing GCS client
client = storage.Client()
bucket = client.bucket("my-group-project-bucket-dat490")
destination_bucket = client.bucket("final_dataset_dat490")

In [None]:
# Checking if the files have been downloaded in the bucket or not
from google.cloud import storage

bucket = client.bucket("my-group-project-bucket-dat490")

blobs = list(client.list_blobs(bucket))

json_gzip_files = []

for blob in blobs:
    json_gzip_files.append(blob.name)

print(f"Found {len(json_gzip_files)} .json.gzip files in {"my-group-project-bucket-dat490"}")


Found 100 .json.gzip files in my-group-project-bucket-dat490


In [None]:
# Installing all the necessary libraries
!apt-get install openjdk-11-jdk -y
!pip install pyspark
!pip install gcsfs


0% [Working]            Get:1 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease [3,632 B]
Get:2 http://security.ubuntu.com/ubuntu jammy-security InRelease [129 kB]
Get:3 https://r2u.stat.illinois.edu/ubuntu jammy InRelease [6,555 B]
Get:4 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  InRelease [1,581 B]
Get:5 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ Packages [68.9 kB]
Hit:6 http://archive.ubuntu.com/ubuntu jammy InRelease
Get:7 https://r2u.stat.illinois.edu/ubuntu jammy/main all Packages [8,712 kB]
Get:8 http://archive.ubuntu.com/ubuntu jammy-updates InRelease [128 kB]
Hit:9 https://ppa.launchpadcontent.net/deadsnakes/ppa/ubuntu jammy InRelease
Get:10 http://security.ubuntu.com/ubuntu jammy-security/restricted amd64 Packages [3,694 kB]
Hit:11 https://ppa.launchpadcontent.net/graphics-drivers/ppa/ubuntu jammy InRelease
Hit:12 https://ppa.launchpadcontent.net/ubuntugis/ppa/ubuntu jammy InRelease
Get:13 https://develope

In [None]:
# Downloading the Google Cloud Storage Connector
# Without this, we cannot read/write in Google Cloud Storage
!wget -P /usr/lib/spark/jars/ https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-hadoop3-latest.jar


--2025-03-04 19:05:03--  https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-hadoop3-latest.jar
Resolving storage.googleapis.com (storage.googleapis.com)... 74.125.135.207, 74.125.142.207, 74.125.195.207, ...
Connecting to storage.googleapis.com (storage.googleapis.com)|74.125.135.207|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 40713341 (39M) [application/java-archive]
Saving to: ‘/usr/lib/spark/jars/gcs-connector-hadoop3-latest.jar’


2025-03-04 19:05:03 (188 MB/s) - ‘/usr/lib/spark/jars/gcs-connector-hadoop3-latest.jar’ saved [40713341/40713341]



In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("BigDataProcessing") \
    .config("spark.jars", "/usr/lib/spark/jars/gcs-connector-hadoop3-latest.jar") \
    .config("spark.hadoop.fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem") \
    .config("spark.hadoop.fs.gs.auth.service.account.enable", "true") \
    .getOrCreate()

In [None]:
states = [
    "Alabama", "Alaska", "Arizona", "Arkansas", "California", "Colorado", "Connecticut", "Delaware", "Florida", "Georgia",
    "Hawaii", "Idaho", "Illinois", "Indiana", "Iowa", "Kansas", "Kentucky", "Louisiana", "Maine", "Maryland",
    "Massachusetts", "Michigan", "Minnesota", "Mississippi", "Missouri", "Montana", "Nebraska", "Nevada", "New_Hampshire", "New_Jersey",
    "New_Mexico", "New_York", "North_Carolina", "North_Dakota", "Ohio", "Oklahoma", "Oregon", "Pennsylvania", "Rhode_Island", "South_Carolina",
    "South_Dakota", "Tennessee", "Texas", "Utah", "Vermont", "Virginia", "Washington", "West_Virginia", "Wisconsin", "Wyoming"
]

western_states1 = [
    "Alaska",
    "Arizona",
    "California",
    "Colorado",
    "Hawaii",
    "Idaho"
]

western_states2 = [
    "Montana",
    "Nevada",
    "New_Mexico",
    "Oregon",
    "Utah",
    "Washington",
    "Wyoming"
]

northeast_states1 = [
    "Connecticut",
    "Maine",
    "Massachusetts",
    "New_Hampshire",
    "Rhode_Island",
    "Vermont",
    "New_Jersey",
    "New_York",
    "Pennsylvania"
]

midwest_states1 = [
    "Illinois",
    "Indiana",
    "Iowa",
    "Kansas",
    "Michigan",
    "Minnesota"
]
midwest_states2 = [
     "Missouri",
    "Nebraska",
    "North_Dakota",
    "Ohio",
    "South_Dakota",
    "Wisconsin"
]

south_states1 = [
    "Delaware",
    "Florida",
    "Georgia",
    "Maryland",
    "North_Carolina",
    "South_Carolina",
    "Virginia",
    "West_Virginia"
]
south_states2 = [
    "Alabama",
    "Kentucky",
    "Mississippi",
    "Tennessee",
    "Arkansas",
    "Louisiana",
    "Oklahoma",
    "Texas"
]

In [None]:
from pyspark.sql.functions import lit
import subprocess

# Define GCP Bucket Paths
input_bucket = "my-group-project-bucket-dat490"
output_bucket = "final_dataset_dat490"
bucket_path = "gs://my-group-project-bucket-dat490/"
temp_output_path = "gs://final_dataset_dat490/temp_parquet_output"

# Lists of states to process
state_lists = {
    "western_states1": western_states1,
    "western_states2": western_states2,
    "northeast_states1": northeast_states1,
    "midwest_states1": midwest_states1,
    "midwest_states2": midwest_states2,
    "south_states1": south_states1,
    "south_states2": south_states2,
}

for list_name, state_list in state_lists.items():
    output_path = f"gs://{output_bucket}/final_data_{list_name}.parquet"
    final_df = None

    for state in state_list:
        reviews_path = f"gs://{input_bucket}/{state}-reviews.json.gz"
        metadata_path = f"gs://{input_bucket}/{state}-metadata.json.gz"

        reviews_df = spark.read.json(reviews_path)
        metadata_df = spark.read.json(metadata_path)

        # Removing MISC column as it contains random description of our dataset
        if "MISC" in metadata_df.columns:
            metadata_df = metadata_df.drop("MISC")

        reviews_sampled = reviews_df.dropDuplicates(["gmap_id"])
        metadata_sampled = metadata_df.dropDuplicates(["gmap_id"])

        reviews_sampled = reviews_sampled.withColumnRenamed("name", "customer_name")
        metadata_sampled = metadata_sampled.withColumnRenamed("name", "business_name")

        # Joining reviews and metadata on "gmap_id"
        merged_df = reviews_sampled.join(metadata_sampled, on="gmap_id", how="inner")

        merged_df = merged_df.withColumn("state", lit(state))

        if final_df is None:
            final_df = merged_df
        else:
            final_df = final_df.union(merged_df)

        print(f"Processed: {state} in {list_name}")

    final_df = final_df.coalesce(1)

    final_df.write.mode("overwrite").parquet(temp_output_path)

    try:
        list_files = subprocess.check_output(f"gsutil ls {temp_output_path}/", shell=True).decode("utf-8").split("\n")
        parquet_files = [f for f in list_files if f.endswith(".parquet")]

        if parquet_files:
            parquet_file = parquet_files[0]
            subprocess.run(f"gsutil mv {parquet_file} {output_path}", shell=True)
            subprocess.run(f"gsutil rm -r {temp_output_path}", shell=True)
            print(f"Final Parquet file saved at {output_path}")
        else:
            print("No Parquet file found in temporary directory.")

    except Exception as e:
        print(f"Error renaming Parquet file: {e}")

✅ Processed: Alabama
✅ Processed: Kentucky
✅ Processed: Mississippi
✅ Processed: Tennessee
✅ Processed: Arkansas
✅ Processed: Louisiana
✅ Processed: Oklahoma
✅ Processed: Texas
✅ Final Parquet file saved at gs://final_dataset_dat490/final_data_south2.parquet


In [None]:
bucket_name = "final_dataset_dat490"
parquet_files = [
    f"gs://{bucket_name}/final_data_midwest1.parquet",
    f"gs://{bucket_name}/final_data_midwest2.parquet",
    f"gs://{bucket_name}/final_data_northeast1.parquet",
    f"gs://{bucket_name}/final_data_south1.parquet",
    f"gs://{bucket_name}/final_data_south2.parquet",
    f"gs://{bucket_name}/final_data_west1.parquet",
    f"gs://{bucket_name}/final_data_west2.parquet"
]

In [None]:
df_final = spark.read.parquet(*parquet_files)

In [None]:
df_final.count()

2988811

In [None]:
df_final.repartition(1).write.mode("overwrite").parquet("gs://final_dataset_dat490/dat490_final_dataset.parquet")

In [None]:
df = spark.read.parquet("gs://final_dataset_dat490/dat490_final_dataset.parquet")
df.count()

2988811

In [None]:
df.printSchema()

root
 |-- gmap_id: string (nullable = true)
 |-- customer_name: string (nullable = true)
 |-- pics: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- url: array (nullable = true)
 |    |    |    |-- element: string (containsNull = true)
 |-- rating: long (nullable = true)
 |-- resp: struct (nullable = true)
 |    |-- text: string (nullable = true)
 |    |-- time: long (nullable = true)
 |-- text: string (nullable = true)
 |-- time: long (nullable = true)
 |-- user_id: string (nullable = true)
 |-- address: string (nullable = true)
 |-- avg_rating: double (nullable = true)
 |-- category: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- description: string (nullable = true)
 |-- hours: array (nullable = true)
 |    |-- element: array (containsNull = true)
 |    |    |-- element: string (containsNull = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- business_name: string (nullable 

We have successfully created the final dataset.

### Dataset
https://datarepo.eng.ucsd.edu/mcauley_group/gdrive/googlelocal/

### References
[1] An Yan, Zhankui He, Jiacheng Li, Tianyang Zhang, Julian Mcauley
The 46th International ACM SIGIR Conference on Research and Development in Information Retrieval (SIGIR), 2023

[2] Jiacheng Li, Jingbo Shang, Julian McAuley
Annual Meeting of the Association for Computational Linguistics (ACL), 2022  