In [1]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import Imputer
from pyspark.sql.functions import col, udf, length, regexp_replace
from pyspark.sql.types import StringType, StructType, StructField, FloatType, IntegerType
import pymongo
import requests
import spacy
import json
import pandas as pd
import sys

In [2]:
# import the connecting credentials 
sys.path.append('../')
from secret import credentials
sys.path.remove('../')

In [3]:
sec = credentials()

In [4]:
# Replace with the version of the connector that matches your Spark version
connector_package = "org.mongodb.spark:mongo-spark-connector_2.12:3.0.1"

In [5]:
# Create a SparkSession. Ensure you have the mongo-spark-connector included.
spark = SparkSession \
    .builder \
    .appName("etl") \
    .config("spark.mongodb.read.connection.uri", sec.connect_string) \
    .config("spark.mongodb.write.connection.uri", sec.connect_string) \
    .config("spark.jars.packages", connector_package) \
    .getOrCreate()

:: loading settings :: url = jar:file:/opt/conda/envs/csgy-6513d-fall2023/lib/python3.9/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/jovyan/.ivy2/cache
The jars for the packages stored in: /home/jovyan/.ivy2/jars
org.mongodb.spark#mongo-spark-connector_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-cdc99772-462e-48f7-ad75-dd8f205b690a;1.0
	confs: [default]
	found org.mongodb.spark#mongo-spark-connector_2.12;3.0.1 in central
	found org.mongodb#mongodb-driver-sync;4.0.5 in central
	found org.mongodb#bson;4.0.5 in central
	found org.mongodb#mongodb-driver-core;4.0.5 in central
:: resolution report :: resolve 480ms :: artifacts dl 15ms
	:: modules in use:
	org.mongodb#bson;4.0.5 from central in [default]
	org.mongodb#mongodb-driver-core;4.0.5 from central in [default]
	org.mongodb#mongodb-driver-sync;4.0.5 from central in [default]
	org.mongodb.spark#mongo-spark-connector_2.12;3.0.1 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts

In [6]:
# books = spark.read.option('header','True').csv('../data/Books.csv', inferSchema = True)
# ratings = spark.read.option('header','True').csv('../data/Ratings.csv', inferSchema = True)
users = spark.read.csv('../data/Users.csv', header = True ,inferSchema = True)

                                                                                

In [7]:
users.show()

+-------+--------------------+----+
|User-ID|            Location| Age|
+-------+--------------------+----+
|      1|  nyc, new york, usa|NULL|
|      2|stockton, califor...|18.0|
|      3|moscow, yukon ter...|NULL|
|      4|porto, v.n.gaia, ...|17.0|
|      5|farnborough, hant...|NULL|
|      6|santa monica, cal...|61.0|
|      7| washington, dc, usa|NULL|
|      8|timmins, ontario,...|NULL|
|      9|germantown, tenne...|NULL|
|     10|albacete, wiscons...|26.0|
|     11|melbourne, victor...|14.0|
|     12|fort bragg, calif...|NULL|
|     13|barcelona, barcel...|26.0|
|     14|mediapolis, iowa,...|NULL|
|     15|calgary, alberta,...|NULL|
|     16|albuquerque, new ...|NULL|
|     17|chesapeake, virgi...|NULL|
|     18|rio de janeiro, r...|25.0|
|     19|           weston, ,|14.0|
|     20|langhorne, pennsy...|19.0|
+-------+--------------------+----+
only showing top 20 rows



In [8]:
users.printSchema()

root
 |-- User-ID: integer (nullable = true)
 |-- Location: string (nullable = true)
 |-- Age: string (nullable = true)



## Data preprocessing 

In [9]:
def get_country(location):
    return location.strip().split(',')[-1].strip()

def get_city(location):
    return location.strip().split(',')[0].strip()


In [10]:
# Register UDF
get_country_udf = udf(get_country, StringType())

get_city_udf = udf(get_city, StringType())



In [11]:
# Apply UDF to DataFrame

users = users.withColumn('country', get_country_udf(col('Location')))

users = users.withColumn('city', get_city_udf(col('Location')))



In [12]:
# Show the result
users.show()

[Stage 3:>                                                          (0 + 1) / 1]

+-------+--------------------+----+--------------+--------------+
|User-ID|            Location| Age|       country|          city|
+-------+--------------------+----+--------------+--------------+
|      1|  nyc, new york, usa|NULL|           usa|           nyc|
|      2|stockton, califor...|18.0|           usa|      stockton|
|      3|moscow, yukon ter...|NULL|        russia|        moscow|
|      4|porto, v.n.gaia, ...|17.0|      portugal|         porto|
|      5|farnborough, hant...|NULL|united kingdom|   farnborough|
|      6|santa monica, cal...|61.0|           usa|  santa monica|
|      7| washington, dc, usa|NULL|           usa|    washington|
|      8|timmins, ontario,...|NULL|        canada|       timmins|
|      9|germantown, tenne...|NULL|           usa|    germantown|
|     10|albacete, wiscons...|26.0|         spain|      albacete|
|     11|melbourne, victor...|14.0|     australia|     melbourne|
|     12|fort bragg, calif...|NULL|           usa|    fort bragg|
|     13|b

                                                                                

In [13]:
users.count()

                                                                                

278859

In [14]:
users.write.format("com.mongodb.spark.sql.DefaultSource") \
    .option("uri", sec.connect_string) \
    .option("database", "bookdb") \
    .option("collection", "users") \
    .mode("append").save()

                                                                                