In [0]:
%run "./requiredPackages"

In [0]:
%run "./supportFunctions"

In [0]:
import pyspark
from pyspark import SparkContext
from pyspark.sql import SparkSession, SQLContext, Row
from pyspark.sql.functions import split, col, regexp_extract, regexp_replace, to_timestamp, udf, unix_timestamp, concat_ws
from pyspark.sql.types import StringType

from pymongo import MongoClient
from json import loads
import pymongo
import ssl

from bson.objectid import ObjectId
import re

In [0]:
# initiate spark session
spark = (SparkSession 
    .builder 
    .appName("geocode_jobs") 
    .getOrCreate()
 )

In [0]:
# load data from mongo db
df = (spark.read
      .format("mongo")
      .option("uri","mongodb+srv://attila:JS8WMhoQB65LynxB@cluster0.gu4ru.mongodb.net/Group_B1_JobStock_db.jobs")
      .load()
)

In [0]:
df = (df
       .withColumn('country', regexp_extract(col('location'), '(Germany)|(Austria)|(Switzerland)|(Anywhere)',0))
       .withColumn('city', regexp_replace(col('location'), '(\s)|(,)|(\\(.*\\))|(Germany)|(Austria)|(Switzerland)',""))
       .withColumn('date', to_timestamp(df.date, 'yyyy-MM-dd'))
      )
#df2.groupBy("country","city", "date").count().show(100)

In [0]:
# concat cleat city and country for geocoding
df = df.withColumn("location_geocoding", concat_ws(", ", "city",  "country"))

In [0]:
#geocode locations
geo_coder_UDF = udf(lambda x : geo_coder(x))
df = df.withColumn('geo_code', geo_coder_UDF(col('location_geocoding')))

In [0]:
# apperently geo_code is a string (and not a list of strings)
df.printSchema()

In [0]:
#Connect to MogoDB
client = MongoClient('mongodb+srv://attila:JS8WMhoQB65LynxB@cluster0.gu4ru.mongodb.net')
mycol = client.Group_B1_JobStock_db.jobs

In [0]:
#write back extendend location info to mongo db. takes ages. map on an rdd does't seem to work here as mycon cannot be parallelized 
df2 = df.select("_id", "city", "country","geo_code")
for row in df2.rdd.collect():
    oId = ObjectId(row["_id"]["oid"])
    lonlat = re.sub("[\\[\\]]", "", row["geo_code"]).split(",") # this is needed only because geocoder returned a string
    res = mycol.update_one(
       { "_id": oId },
       { "$set": { "location_extended" : {"city" : row["city"] , 
                                          "country" : row["country"], 
                                          "longitude" : float(lonlat[0]), 
                                          "latitude" :  float(lonlat[1]) }}}
    )
    print(f" id { oId }, matched {res.matched_count}, modified {res.modified_count}, id {res.upserted_id}, ack {res.acknowledged}")