In [None]:
# 1) First: install Java, Spark and and run a local Spark session by just running this on Google Colab:

!apt update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null   # !apt-get --> install java
!wget -q https://downloads.apache.org/spark/spark-3.1.3/spark-3.1.3-bin-hadoop3.2.tgz  # !wget  --> download file from url
!tar xf spark-3.1.3-bin-hadoop3.2.tgz  # !tar --> like unzip 
!pip install -q findspark  # !pip  --> instal a package, we cant import a library without installing it first, most libraries that we used were already installed

# This are INSTALLATION COMMANDS IN LINUX that we run in our collab space, it's similar to downloading programs an installing them on our computers
# installs Apache Spark 3.1.2, Java 8, and Findspark, a library that makes it easy for Python to find Spark

# 2) Second: set the locations where Spark and Java are installed to let know Colab where to find it.

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.3-bin-hadoop3.2"

# 3) Third: import spark libraries and use them

import findspark
findspark.init("spark-3.1.3-bin-hadoop3.2") # SPARK_HOME

from pyspark.sql import SparkSession

# Create the session - We need to remember to close it at the end
# The session is basically our connection to Spark layer in the Hadoop ecosystem
spark = SparkSession.builder.master("local[4]").getOrCreate()

from pyspark.sql import Row
from pyspark.sql import functions as F
from pyspark.sql.types import  IntegerType , StringType
from pyspark.context import SparkContext

# Code for connecting our google drive to this collab notebook
from google.colab import drive
drive.mount('/content/drive')

# After you finish, look at the menu (gui to the left) and check under "files" that your "drive" folder was added.

[33m0% [Working][0m            Get:1 http://security.ubuntu.com/ubuntu bionic-security InRelease [88.7 kB]
[33m0% [Connecting to archive.ubuntu.com] [1 InRelease 14.2 kB/88.7 kB 16%] [Connec[0m                                                                               Hit:2 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
[33m0% [Connecting to archive.ubuntu.com] [1 InRelease 14.2 kB/88.7 kB 16%] [Waitin[0m[33m0% [2 InRelease gpgv 1,581 B] [Connecting to archive.ubuntu.com] [1 InRelease 1[0m                                                                               Hit:3 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease
[33m0% [2 InRelease gpgv 1,581 B] [Connecting to archive.ubuntu.com (91.189.91.38)][0m                                                                               Hit:4 http://archive.ubuntu.com/ubuntu bionic InRelease
Get:5 http://archive.ubuntu.com/ubuntu bionic-updates InRelease [

In [None]:
#reading csv file to dataframe
spark = SparkSession.builder.appName("users").getOrCreate()

df = spark.read.options(delimiter=';')\
  .csv("/content/drive/MyDrive/spark/BX-Users.csv", header= True)
df.printSchema()
#df.show()

root
 |-- User-ID: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- Age: string (nullable = true)



In [None]:
#spliting the  location column into 3 columns (city, state,country)
df = df.withColumn("city", F.split(F.col("Location"), ",")[0]).withColumn("state", F.split(F.col("Location"), ",")[1])\
.withColumn("country", F.split(F.col("Location"), ",")[2]).drop("Location").withColumnRenamed('User-ID','User_ID')

# setting the age  value  to -1 if the value is null and mnitulating the user column
df.withColumn('User_ID', F.trim(df.User_ID)).filter(F.col("User_ID").isNull()).drop()\
.withColumn("Age", F.when(F.col("Age").isNull(), -1).otherwise(F.col("Age")))

# changing the data type of the columns
df = df.withColumn("Age", F.col("Age").cast(IntegerType()))\
.withColumn("User_ID", F.col("User_ID").cast(IntegerType()))\
.withColumn("city", F.col("city").cast(StringType()))\
.withColumn("state", F.col("state").cast(StringType()))\
.withColumn("country", F.col("country").cast(StringType()))
df.dtypes

[('User_ID', 'int'),
 ('Age', 'int'),
 ('city', 'string'),
 ('state', 'string'),
 ('country', 'string')]

In [None]:
list_chinese_char =(u'[⺀-⺙⺛-⻳⼀-⿕々〇〡-〩〸-〺〻㐀-䶵一-鿃豈-鶴侮-頻並-龎-中]')

# if the country is not in the list of countries, then it is set to unknown  # if the country is in the state or city, then it is set to country
df = df.withColumn("country", F.when(F.col("country").isin(list_chinese_char),"China").otherwise(F.col("country")))\
.withColumn("country", F.when(F.col("city").isin(list_chinese_char),"China").otherwise(F.col("country")))\
.withColumn("country", F.when(F.col("state").isin(list_chinese_char),"China").otherwise(F.col("country")))

In [None]:
# triming empty spaces in first and last char
df = df.withColumn('state', F.trim(df.state)).withColumn('city', F.trim(df.city)).withColumn('country', F.trim(df.country))\
.withColumn("country", F.when(F.col("country").rlike('[^a-z ^A-Z ^-]'), "Unknown").otherwise(F.col("country")))\
.withColumn("state", F.when(F.col("state").rlike('[^a-z ^A-Z ^-]'), "Unknown").otherwise(F.col("state")))\
.withColumn("city", F.when(F.col("city").rlike('[^a-z ^A-Z ^-]'), "Unknown").otherwise(F.col("city")))\
.withColumn("city", F.initcap(F.col("city"))).withColumn("state", F.initcap(F.col("state")))\
.withColumn("country", F.initcap(F.col("country")))\
.withColumn("city", F.when(F.col("city") == '', value= 'Unknown').otherwise(F.col("city")))\
.withColumn("state", F.when(F.col("state")== '', value= 'Unknown').otherwise(F.col("state")))\
.withColumn("country", F.when(F.col("country")== '', value= 'Unknown').otherwise(F.col("country")))\
.withColumn("city", F.when(F.col("city") == ' ', value= 'Unknown').otherwise(F.col("city")))\
.withColumn("state", F.when(F.col("state")== ' ', value= 'Unknown').otherwise(F.col("state")))\
.withColumn("country", F.when(F.col("country")== ' ', value= 'Unknown').otherwise(F.col("country")))\
.withColumn("city", F.when(F.col("city") == '  ', value= 'Unknown').otherwise(F.col("city")))\
.withColumn("state", F.when(F.col("state")== '  ', value= 'Unknown').otherwise(F.col("state")))\
.withColumn("country", F.when(F.col("country")== '  ', value= 'Unknown').otherwise(F.col("country")))\
.withColumn("country", F.when(F.col("country")== 'Usa','United States').otherwise(F.col("country")))

In [None]:
list_country =['Republic Of Singapore','Republic Of Korea','The Philippines','Isreal','Syria','Trinidad And Tobago','South Korea','Venezuela','Bosnia And Herzegovina','Yugoslavia','Taiwan','Vietnam','Iran','Russia','Afghanistan',\
'Aland Islands', 'Albania', 'Algeria', 'American Samoa', 'Andorra', 'Angola', 'Anguilla', 'Antarctica', 'Antigua and Barbuda', 'Argentina', 'Armenia', 'Aruba', 'Australia', 'Austria', 'Azerbaijan', 'Bahamas', 'Bahrain',\
'Bangladesh', 'Barbados', 'Belarus', 'Belgium', 'Belize', 'Benin', 'Bermuda', 'Bhutan', 'Bolivia','Bolivia, Plurinational State of', 'Bonaire, Sint Eustatius and Saba', 'Bosnia and Herzegovina', 'Botswana', 'Bouvet Island', \
'Brazil', 'British Indian Ocean Territory', 'Brunei','Brunei Darussalam', 'Bulgaria', 'Burkina Faso', 'Burundi', 'Cambodia', 'Cameroon', 'Canada', 'Cape Verde', 'Cayman Islands', 'Central African Republic', 'Chad', 'Chile', \
'China', 'Christmas Island', 'Cocos (Keeling) Islands', 'Colombia', 'Comoros', 'Congo', 'Congo','Congo, The Democratic Republic of the', 'Cook Islands', 'Costa Rica', 'Ivory Coast',"Côte d'Ivoire", 'Croatia', 'Cuba', 'Curaçao', \
'Cyprus', 'Czechia','Czech','Czech Republic', 'Denmark', 'Djibouti', 'Dominica', 'Dominican Republic', 'Ecuador', 'Egypt', 'El Salvador', 'Equatorial Guinea', 'Eritrea', 'Estonia', 'Ethiopia', 'Falkland Islands (Malvinas)', \
'Faroe Islands', 'Fiji', 'Finland', 'France', 'French Guiana', 'French Polynesia', 'French Southern Territories', 'Gabon', 'Gambia', 'Georgia', 'Germany', 'Ghana', 'Gibraltar', 'Greece', 'Greenland', 'Grenada', 'Guadeloupe', \
'Guam', 'Guatemala', 'Guernsey', 'Guinea', 'Guinea-Bissau', 'Guyana', 'Haiti', 'Heard Island and McDonald Islands', 'Vatican','Holy See (Vatican City State)', 'Honduras', 'Hong Kong', 'Hungary', 'Iceland', 'India', 'Indonesia', \
'Iran','Iran, Islamic Republic of', 'Iraq', 'Ireland', 'Isle of Man', 'Israel', 'Italy', 'Jamaica', 'Japan', 'Jersey', 'Jordan', 'Kazakhstan', 'Kenya', 'Kiribati', "Korea, Democratic People's Republic of", 'Korea, Republic of', \
'Kuwait', 'Kyrgyzstan', "Lao People's Democratic Republic", 'Latvia', 'Lebanon', 'Lesotho', 'Liberia', 'Libya', 'Liechtenstein', 'Lithuania', 'Luxembourg', 'Macao', 'Macedonia','Macedonia, Republic of', 'Madagascar', 'Malawi', \
'Malaysia', 'Maldives', 'Mali', 'Malta', 'Marshall Islands', 'Martinique', 'Mauritania', 'Mauritius', 'Mayotte', 'Mexico', 'Micronesia','Micronesia, Federated States of', 'Moldova','Moldova, Republic of', 'Monaco', 'Mongolia', \
'Montenegro', 'Montserrat', 'Morocco', 'Mozambique', 'Myanmar', 'Namibia', 'Nauru', 'Nepal', 'The Netherlands','Holland','Netherlands', 'Caledonia','New Caledonia', 'New Zealand', 'Nicaragua', 'Niger', 'Nigeria', 'Niue',\
'Norfolk Island', 'Northern Mariana Islands', 'Norway', 'Oman', 'Pakistan', 'Palau', 'Palestin','Palestinian Territory, Occupied', 'Panama', 'Papua New Guinea', 'Paraguay', 'Peru', 'Philippines', 'Pitcairn', 'Poland', \
'Portugal', 'Puerto Rico', 'Qatar', 'Reunion','Réunion', 'Romania', 'Russia','Russian Federation', 'Rwanda', 'Saint Barthélemy','Saint Helena','Saint Helena, Ascension and Tristan da Cunha', 'Saint Kitts and Nevis', \
'Saint Lucia', 'Saint Martin','Saint Martin (French part)', 'Saint Pierre and Miquelon', 'Saint Vincent and the Grenadines', 'Samoa', 'San Marino', 'Sao Tome and Principe', 'Saudi Arabia', 'Senegal', 'Serbia', 'Seychelles', \
'Sierra Leone', 'Singapore', 'Sint Maarten (Dutch part)', 'Slovakia', 'Slovenia', 'Solomon Islands', 'Somalia', 'South Africa','South Georgia','South Georgia and the South Sandwich Islands', 'Spain', 'Sri Lanka', 'Sudan',\
'Suriname', 'South Sudan', 'Svalbard and Jan Mayen', 'Swaziland', 'Sweden', 'Switzerland', 'Syrian Arab Republic', 'Taiwan, Province of China', 'Tajikistan', 'Tanzania, United Republic of', 'Thailand', 'Timor-Leste', 'Togo',\
'Tokelau', 'Tonga', 'Trinidad and Tobago', 'Tunisia', 'Turkey', 'Turkmenistan', 'Turks and Caicos Islands', 'Tuvalu', 'Uganda', 'Ukraine', 'United Arab Emirates', 'United Kingdom', 'United States',\
'United States Minor Outlying Islands', 'Uruguay', 'Uzbekistan', 'Vanuatu', 'Venezuela, Bolivarian Republic of', 'Viet Nam','Virgin Islands','Virgin Islands, British', 'Virgin Islands, U.S.',\
'Wallis and Futuna', 'Yemen', 'Zambia', 'Zimbabwe','Myanmar','Burma']

In [None]:
# if the country is not in the list of countries, then it is set to unknown  # if the country is in the state or city, then it is set to country
df = df.withColumn("country", F.when(F.col("country").isin(list_country), value= F.col("country")).otherwise("Unknown"))

df=df.withColumn("country", F.when((F.col("country") == "Unknown")& F.col("state").isin(list_country), F.col("state")).otherwise(F.col("country")))\
.withColumn("country", F.when((F.col("country") == "Unknown")& F.col("city").isin(list_country), F.col("city")).otherwise(F.col("country")))

df.groupBy("country").count().orderBy(F.col("count").desc()).show(200)

+--------------------+------+
|             country| count|
+--------------------+------+
|       United States|139201|
|              Canada| 21558|
|      United Kingdom| 18304|
|             Germany| 17058|
|               Spain| 13211|
|           Australia| 11726|
|               Italy| 11263|
|             Unknown|  6713|
|              France|  3484|
|            Portugal|  3372|
|         New Zealand|  3100|
|         Netherlands|  3046|
|         Switzerland|  1760|
|              Brazil|  1677|
|               China|  1482|
|              Sweden|  1454|
|               India|  1284|
|             Austria|  1148|
|            Malaysia|  1102|
|           Argentina|  1081|
|             Finland|   942|
|           Singapore|   934|
|             Denmark|   856|
|              Mexico|   827|
|             Belgium|   820|
|             Ireland|   755|
|         Philippines|   725|
|              Turkey|   497|
|              Poland|   458|
|            Pakistan|   436|
|         

In [None]:
df.coalesce(1).write.option("header" ,"True").csv("/content/drive/MyDrive/spark/users_file.csv","overwrite",header =True)

In [None]:
#reading csv file to dataframe
spark = SparkSession.builder.appName("users").getOrCreate()

df = spark.read.options(delimiter=',')\
  .csv("/content/drive/MyDrive/spark/users_file.csv/part*.csv", header= True)
df.printSchema()
df.show()

root
 |-- User_ID: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- country: string (nullable = true)

+-------+----+--------------+---------------+--------------+
|User_ID| Age|          city|          state|       country|
+-------+----+--------------+---------------+--------------+
|      1|null|           Nyc|       New York| United States|
|      2|  18|      Stockton|     California| United States|
|      3|null|        Moscow|Yukon Territory|        Russia|
|      4|  17|         Porto|        Unknown|      Portugal|
|      5|null|   Farnborough|          Hants|United Kingdom|
|      6|  61|  Santa Monica|     California| United States|
|      7|null|    Washington|             Dc| United States|
|      8|null|       Timmins|        Ontario|        Canada|
|      9|null|    Germantown|      Tennessee| United States|
|     10|  26|      Albacete|      Wisconsin|         Spain|
|     11|  14| 

In [None]:
# reading csv file
state_df = spark.read.csv("/content/drive/MyDrive/spark/us-states-territories.csv", header=True, inferSchema=True)

#creating a list of Us states and territories
list_states = ['Dc','United Stated','Usa','District Of Columbia','Us Army','United Stated Of America','United States Of America']
for i in range(0, state_df.count()):
  state = state_df.collect()[i][1]
  if state[0]==' ':
    state = state[1:]
    list_states.append(state)
  else:
    list_states.append(state)
# reading csv file
city_df = spark.read.csv("/content/drive/MyDrive/spark/uscities.csv", header=True, inferSchema=True)

#creating a list of Us cities
cities = city_df.select("city").collect()
cities_list = ['Us Virgin Islands','Chesterfield South Carolina','Ny','Nyc','Northern Virginia','New York City','Halethorpe','Ft','Newalla','Stone Mtn','S Charleston','Diberville','Sherman Oaks','Leucadia','Upper Darby',\
'Teaneck','Tx','Tewksbury','Potomac Falls','Coconut Grove','O Fallon','Paynesville Mn','Port Hadlock','Horseheads','Penacook','Mcalester','The Bronx','Winston-salem','Mo','Johntown','Mililani','Cheektowaga','Bloomfield Po',
'Rockaway','East Greenwich','Allston','Wilkes-barre','Northborough','Philadelphia Area','Mcminnville','Kailua-kona','Little Neck','Goffstown','Paso Robles','Saint Louis Park','Kamuela','King Of Prussia','Studio City','Lakebay','Rhode St',\
'Oxon','West Hartford','Wilkes Barre','Fresh Meadows','Wi','Mahwah','Phila','Brentwood Bay','So Burlington','Fl','Sandiego','East Boston','Islesboro','Pepperell','Tolland','Davis Bay','Plaistow','New Bru','Swisshome','Tn','North Hollywood',\
'Md','Brattleboro','Oak Cliff','Ellenwood','Nj','Topsham','Kalifornien','Ut','King Court','Newport Coast','Halethorpe','Canton Ct','Derbys','Oh','Southbridge','Shasta Lake City','Ringle','La Jolla','Bridgewater Corners','Mpls',\
'The Elk Valley','Mckinney','Kill Devils Hills','Toms River','Abington','Huddleston','South Hadley','Bowdoin','Podunk','Skippack','Uxbridge','Anaheim Hills','Deland','Myers','Ponte Vedra','Ma','Mount Laurel','New Ferry',\
'West Vancouver','Bel Alton','Noxen',]
for city in cities:
    cities_list.append(city[0])
# reading csv file
uk_city_df = spark.read.csv("/content/drive/MyDrive/spark/uk_cities.csv", header=True, inferSchema=True)

#creating a list of Uk cities
uk_cities = uk_city_df.select("city").collect()
uk_cities_list = ['Hertfordshire','England','Northern Ireland','Scotland', 'wales','Uk','uk','Nottinghamshire','East Sussex','Derbyshire','Leicestershire','Greater London','Berkshire','Wiltshire','British Virgin Islands','West Midlands','Bedfordshire','Middx','Lancashire'\
,'Cumbria','Denbighshire County','Gloucestershire','West Yorkshire','Cambridgeshire','England Uk','Wooler','Appleby Magna','Llanymddyfri','Romford','Milfield','Merseyside','Kilbarchan','Scone','Upavon','Bridgend','Bryncrug',\
'Gainsborough','Downham Market','Northamptonshire','Aberdeenshire','Powys','Stourport On Severn','Southery','Cleckheaton','Renfrewshire','East Ayrshire','Rhondda Cynon Taff','Pontyclun','Lyneham','Horncastle','Scottish Borders','Llanbradach',\
'East Finchley','Tarland','Much Hadham','Heckmondwike','West Sussex','Rudgwick','Lincs','Balivanich','Argyll','Crickhowell','Isle Of Benbecula','Mid Glamorgan','Ackleton','St Albans','Cowbridge','Campbeltown','Forest Of Dean',\
'Nefyn','Wirral','Bushey','Beddau','Herts','Radcliffe On Trent','Highclere','Gwynedd','Bushey Heath','Stockton-on-tees','Northwich','Carmarthenshire','Worcestershire','Lurgan','Bicester','Pontardawe','Dinas Mawddwy','Manningtree',\
'Ok','North Yorkshire','Aldergrove','Houghton Regis','Orkney','Aberfeldy','Folkestone Kent','Redbourn','Oxfordshire','Lauder','West Glamorgan','Isle Of Wight','Letchworth','Smarden','Lochgilphead','Mold','Oxton','Braunston',\
'County Armagh','Dunstable','Flintshire','Caerphilly','West Kilbride','Newcastle Upon Tyne','Todmorden','Road Town','Tortola','Little Bay','Virgin Gorda']
for city in uk_cities:
    uk_cities_list.append(city[0])
# reading csv file
de_city_df = spark.read.csv("/content/drive/MyDrive/spark/de_state_cities.csv", header=True, inferSchema=True)

#creating a list of german cities and german states
de_cities = de_city_df.select("city").collect()
de_cities_list = ['Deutschland','Nordrhein-westfalen','Niedersachsen','Nordrheinwestfalen','Rheinland-pfalz','Hessen','Baden-wuerttemberg'\
,'Schleswig-holstein','Nordrhein Westfalen','Nordrhein Westfalen','Sachsen','North Rhine Westphalia','Ruhrgebiet','Region Stuttgart','Cambs'\
,'Nrw','Der','Bayern','North Rhine Westfalia','Frankfurt Am Main','Muenchen','Landstuhl','Wuerzburg','Friedrichsthal','Hohenhameln','Spangdahlem','Duesseldorf','Lueneburg','Freiburg','Friedersdorf','Roetgen','Grossenseebach','Inzlingen',\
'Goeppingen','Sachsen-anhalt','Scheden','Monakam','Pfullingen','Graefelfing','Hitzacker','Wellendingen','Bargenstedt','Schliersee','Rheinland Pfalz','Bad Liebenzell','Ludwigshafen Am Rhein','Meersburg','Bis']
for city in de_cities:
    de_cities_list.append(city[0])

de_states =de_city_df.select("state").collect()

for state in de_states:
  if state not in de_cities_list:
    de_cities_list.append(state[0])

# reading csv file
ca_city_df = spark.read.csv("/content/drive/MyDrive/spark/canadacities.csv", header=True, inferSchema=True)

#creating a list of german cities and canadian states
ca_cities = de_city_df.select("city").collect()
ca_cities_list = ['Yukon Territory','Sudbury','North Vancouver','V I C T O R I A','Territorio De Yukon','Canada Eh','Bc','Il Canada','Ouranos','La Quebec','Block-o','Saskatoon','Fredericton','Calgary','Cambre','Aylmer','Powell River',\
'Manotick','Seba Beach','Seabright','Leduc','La Ronge','Ancaster','Burnaby','Chilliwack','Saanichton','Brampton','Arnprior','Military Base','Wolfville','Ajax','Shawnigan Lake','Three Hills','Revelstoke','Prince Albert','Melfort',\
'Didsbury','St-narcisse','Charlesbourg','Waterdown','Celista','Brockville','Winnipeg','Kincardine','Kamloops','Glace Bay','Langham','Carrot Creek','Banff','Bobcaygeon','Beaverdell','Charlottetown','Qualicum Bay','Bocabec','Barrie',\
'Tsawwassen','Lethbridge','Coquitlam','Campbell River','Moncton','Campbellford','Port Coquitlam','Nanaimo','Comox','Belle Vallee','Valemount','Vanderhoof','Pechina','Creston Bc','Ladner','Guelph','Keswick Ridge','Mississauga',\
'Kitchener','Kelowna','Brantford']
for city in ca_cities:
    ca_cities_list.append(city[0])

ca_province =ca_city_df.select("province_name").collect()

for province in ca_province:
  if province not in ca_cities_list:
    ca_cities_list.append(province[0])
# reading csv file
in_city_df = spark.read.csv("/content/drive/MyDrive/spark/in.csv", header=True, inferSchema=True)

#creating a list of german cities and indian states
in_cities = in_city_df.select("city").collect()
in_cities_list = ['Maharashtra','New Mumbai','Bombay','Eastanadbagh','Balcistsa','Tamil Nadu','Bambai', 'Varanasi','Nagercoil','Ulhasnagar','Bokaro','Hyderabad']
for city in in_cities:
    in_cities_list.append(city[0])

in_state = in_city_df.select("state").collect()

for state in in_state:
  if state not in in_cities_list:
    in_cities_list.append(state[0])
# reading csv file
au_city_df = spark.read.csv("/content/drive/MyDrive/spark/au.csv", header=True, inferSchema=True)

#creating a list of german cities and austrelian states
au_cities = au_city_df.select("city").collect()
au_cities_list = ['Canberra','Upper Austria','Alderney','Country','Qld','Burwood','Nowhere','Barham','Hungary And Usa','Mordor','World','Nsw','Sydney','Caringbah','Naracoorte','Chifley','Sunshine Coast','Patterson Lakes',\
'Mitcham','Stawell','Romsey','Dulwich Hill','Monbulk','Stratham','Tea Tree Gully','Glen Osmond','Coronet Bay','Nyora','Merricks','North Ringwood','Coral Bay','Highton','Surry Hills','Heidelberg Heights','Horsham','Geelong',\
'Newbury Park','Gibsons','Mcarthur River','Waverton','Seven Mile Beach','Holloways Beach','Byron Bay','Adelaide','Black Lake','Crediton','Maryborough','Rockhampton','Kangarilla','Bankstown','Buderim','Rothwell','Werribee','Moon','Ermington',\
'Cairns','Medowie']
for city in in_cities:
    au_cities_list.append(city[0])

au_state =au_city_df.select("state").collect()

for state in au_state:
  if state not in au_cities_list:
    au_cities_list.append(state[0])

In [None]:
italy_list =['Sardegna','Emilia','Itlay','Reggio Emilia','Vatican City','Liguria','Italia','Roma Lazio','Lombardia','Lazio','Toscana','Emilia Romagna','Sicilia','Firenze','Veneto','Genova','Calabria','Campania','Il Lazio','Venezia Giulia','Marche','Pesaro Urbino','Gorizia','Vicenza','South Milano'\
,'Deep Padania','Piemonte','Basilicata','Friuli Venezia Giulia','La Puglia','Puglia','Varese','Italien','Molise','Rmi','Cuneo','Rosello','Umbria','La Liguria','Il Veneto','Trentino Alto Adige','Region Centro'\
,'Impruneta','Gubbio','Udine','Forli','Caorso','Arcore','Crema','Terni','Trieste','Bologna','Lecce','Salerno','Ferrara','Sambro','Bellusco','Sesto San Giovanni','Scanzorosciate','Livorno','Potenza','Cernusco Sul Naviglio','Abbiategrasso',\
'Bellaria-igea Marina','Grosseto','Ortona','Cagliari','Lentini','Catania','Finale Ligure','Morlupo','Melzo','Cologno Monzese','Aviano','Cadoneghe','Cinisello Balsamo','Pisa','San Martino Di Lupari','Quarona','Fiumicino',\
'Busto Arsizio','Zanica','Sarzana','Morazzone','Senigallia','Magliano Di Tenna','Santa Margherita Ligure','Cesena','Monza','Pioltello','Fiesole','Brescia','Grandola Ed Uniti','Caserta','Vercelli','Portogruaro','Parona','Napoli','Novara',\
'Trezzano Sul Naviglio','Bergamo','Tiana','Torino','Venedig','Latina','Priverno','Bari','Siena','Casalgrande','Sarentino','Cernusco S','Venezia','Treviso','Padova','Mirano','Terracina','Rivoli','Vignate','Perugia','Pordenone','Imola','Orvieto']
spain_list =['Santa Cruz De Tenerife','Basque Country','Gipuzkoa','Barcelona','Andalucia','Canarias','Catalunya','Bizkaia','Catalonia','Asvinenea','Santa Cruz De Tenerife','Galiza','A Corunha','Comunidad Valenciana','Estremadura'\
,'Pais  Vasco','Alava','Pontevedra','Extremadura','Pais Vasco','Sevilla','Zaragoza','Euskal Herria','Palma De Mallorca','Asturies','Asturias','Huesca','Santiago De Compostela','Nafarroa','Galicia','Lleida','Orense','El Bierzo','Vic'\
,'Gerona','Navarra','Burgos','Guipuzcoa','Illes Balears','Xove','Cantabria','Palma','Girona','Huelva','Valladolid','Baleares','Araba-basque Country','Islas Baleares','Badajoz','Euskadi','Illes Baleares','Tenerife'\
,'El Vendrell','Mahon','Mungia','San Sebastian','Compostela','Ferrol','Santander','Vizcaya','Ourense','Vitoria - Gasteiz','Torredembarra','Gandia','Olesa De Montserrat','Igualada','Donostia','Getafe','Valence','Olesa','Berga','Gasteiz',\
'Tarragona','Vitoria','Terrassa','Murcia','Benicassim-castellon','Liencres','Basauri','Gijon','Bilbao','Vitoria-gasteiz','Ciempozuelos','El Puerto De Santa Maria']
brazil_list= ['Rio De Janeiro','Brasil','Minas Gerais','Sao Paulo','Nordeste','Df','Distrito Federal','Fortaleza','Pernambuco','Mafra','Ap','Rio Grande Do Sul','Curitiba','Brasilia','Campinas','Sao Tome','Bahia','Rubiataba',\
'Bom Jesus Da Lapa-bahia','South Saint Paul']
China_list = ['Guandong Province','Wuhan','Zhejiang','Guang Xi','Guangdong','Yunling','Liushi','Beijing','Cn','Dalian','Jiangxi','Jilin','Yu','Shanghai','Hebei','Guangzhou','Xinyu','Hubei','Hb','Zhanjiang','Shanghai  China'\
,'Jiangsu Province','Mudanjiang','Hainan','Jiangsu','Fujian','Hunan','Shanxi Province','Huaihua','Zhengjiang','Yu-song','Jx','Nanjing','Liaoning','P R China','Harbin','He Nan','Haiku','Tianjin','Taiyuan','Bj China','Qingdao',\
'Taixing','Zheng Zhou','Wuzhishan City']
france_list = ['Nord-pas-de-calais','Alsace','Montsegur','Ile De France','Haute-savoie','Essonne','Brittany','Aveyron','Frolois','Bretagne','Ile-de-france','Bourgogne','St Cloud','Haut-rhin','Provence','Haute-garonne','Quesnel',\
'Nantes','Nouvelle Eglise','Cachen','Arnouville Les Gonesse','Laxou','Le Cannet','Finistere','Villejuif','Montmartre','Orly','Muret','Val De Marne','Strasbourg','Normandie','Reims','Sainte-foy','Toulouse','Les Ulis','Pantin','Bourbriac',\
'Marseille','Roquesteron-grasse','Belfort','Lille','Hyeres','Rhone Alpes','Saint Cloud','Aix Les Bains','Chatelaillon-plage','Nancy','Marnefer','Bourg En Bresse','Franche Comte','Mazieres-en-gatine','Leers','St Jean Le Vieux',\
'Montpellier','Haute Garonne','Rennes','Dijon','Pau']
Belgium_list = ['Bruxelles','Namur','Bruselas','Belgique','Vlaams-brabant','Belgi','Hainaut','Liege','Armed Forces-europe','Antwerpen','West-vlaanderen','Province Du Luxembourg']
Portugal_list= ['Porto','Tavira','Lisboa','Mem-martins','Loures','Ribatejo','Coimbra','Figueira Da Foz','Oeiras','Sintra','Real','Almada','Beira Litoral','Gaia','Oliveira Do Hospital','Alentejo','Minho','Sesimbra','Aveiro',\
'Alto Alentejo','Quinta Do Conde','Vila Real','Barreiro','Vila Nova Da Barquinha','Esposende','Arruda Dos Vinhos','Mos','Caldas Da Rainha','Carnaxide','Maia','Oporto','Costa Da Caparica']
Sweden_list = ['Norrbotten','Skaraborg','Vaxjo','Vastra Gotaland','Norbotten','Dalarna','Smaland','Goteborg','Halland','Gotland','Uppland']
Switzerland_list = ['Geneve','Canton De Vaud','Zh','Gallen','Leman Lake','Aargau','Vaud','Ticino','Basel Landschaft','Suisse','Valais','Tessin','Solothurn','Wangen','Piazzogna','Oberurnen','Luzern','Tralala']
Philippines_list =['Binangonan','Rizal','Pooc Occidental','Batangas Province','Bulacan','Marulas','Cebu City','Philippine','Pasig','Bicol','Pampanga','Metro Manila','Agusan Del Sur','Visayas Region','Benguet','Cavite','Cebu',\
'Tacloban City','Leyte','Makati','Cebu Province Philippines','Sagadan','Baliuag','Apo','Naga City','Tarlac','Mandaluyong','Pasay City','Muntinlupa City','Mariveles','Iligan City','Quezon City','Dagupan City','Mandaluyong City']
Nigeria_list = ['Lagos State','Lagos','Ilorin','Maiduguri','Capital Federal','Ondo State','Oyo State','Kwara  State','Calabar - Nigeria','Rivers State','Federal Territory','Imo State','Mainland','Uyo','Kwara State','Los Lagos']
New_zealand_list = ['Waikato','Ashburton','North Auckland','Manawatu','Petone','Nz','Fred','Northland','South Island','North Island','Otago','Auckland','Central Otago','Taranaki','Motueka','Charteris Bay','Rotorua','Tauranga',\
'Maui','Lyttelton Harbour','Temuka','Palmerston North','Timaru','Gisborne','Huntly','Lower Hutt','Hanmer','Waipukurau']
Netherlands_list = ['Etten - Leur','Noord-holland','Flevoland','Zuid-holland','Gelderland','Brabant','Noord-brabant','Noord Holland','Twente','Groningen','Nord Holland','Overijssel','Noord Brabant','Drenthe','Utrecht',\
'Wide','Zuid Holland','Heerlen','Huizen','Amersfoort','Amstelveen','Enschede','Eindhoven','Dronten','Leiden','Capelle Ad Ijssel','The Hague','Den Haag','Voorschoten','Boskoop','Hellevoetsluis']
Mexico_list =['Nuevo Leon','Monterrey','Ciudad De Mexico','Culiacan','Neverland','Mexico City','Tecamac','Sinaloa','Jalisco','Puebla','Zapopan','Michoacan','Estado De Mexico','Veracruz','Morelos' 'Guadalajara','Alburquerque','Naucalpan',\
'Pantitlan','A Estrada','Cuernavaca', 'Morelos', 'Guadalajara']
Japan_list = ['Chiba Ken','Japan Military','Okinawa','Nakano','Ota','Tokyo','Kanagawa-ken','Fukuoka','Iwakuni','Kanagawa','Hagiwara','Yokota Air Base', 'Aki', 'Mizunami', 'Chiba']
Turkey_list =['Aydin','Turkei','Mugla','Eskisehir','Ilkadim','Ankara','Tuerkei','Istanbul']
Malaysia_list =['Selangor','Selangor Darul Ehsan','Perlis','Johor','Kuala Lumpur','Negeri Sembilan','Malaysian','Penang','Alor Star','Kedah','Perak','Ipoh','Sarawak','Kluang','Subang Jaya','Muar','Kota Kinabalu','Melaka','Alor Setar',\
'Seremban','Petaling Jaya','Kangar','Batang Kali','Cyberjaya']
Indonesia_list = ['Jawa Tengah','Nias Island','Sorry','Yogyakarta','Nederlands','Makassar','Jawa Barat','Netherland','Id','Temanggung','Jawa-tengah','North Sumatera','Jakarta','Mdc','Town Of Bali','Surabaya','South Sulawesi'\
,'Jkt','Malang','Bandung','Bundang','Medan']
Argentina_list = ['Caseros','Buenos Aires','Alta Gracia','Moron','Lanus','Cordoba','Mar Del Plata']
Austria_list = ['Salzburg','Tirol','Wilfleinsdorf','Niederoesterreich','Desselbrunn','Wien','Schwaz','Wolfsberg','Innsbruck']
Chile_list = ['Calama','La Serena','Region Metropolitana','Curanilahue','San Vicente De Tagua Tagua','Coyhaique']
Croatia_list = ['Kraljevica','Slavonski Brod','Rijeka','Kvarner','Zagreb','Slavonija','Trnovec','Horvatija']
Denmark_list = ['Randers','Esbjerg','Svendborg','Nordborg','Jutland','Aarhus','Skive','Bogoe','Frederiksberg']
Finland_list = ['Helsinki','Pori','Turku','Hollola','Uusimaa','Tampere','Lahti']
Ireland_list = ['Letterkenny','Dunmanway','Tullow','Recess','Limerick','Castlecomer','Connemara','Castlegar']
South_africa_list = ['Kwazulu-natal','Cape Town','Mafikeng','Durban','Durbanville','Western Cape','Edgemead']
Tanzania_list = ['Morogoro','Zanzibar','Tanzania','Dar Es Salam','Stone Town','Dar Es Salaam','Dar-es-salaam','Daressalam','Kilimanjaro','Forest Falls']

In [None]:
df=df.withColumn("country", F.when((F.col("country") == "Unknown")&F.col("state").isin(list_states),"United States").otherwise(F.col("country")))\
.withColumn("country", F.when((F.col("country") == "Unknown")&F.col("city").isin(list_states),"United States").otherwise(F.col("country")))

df=df.withColumn("country", F.when((F.col("country") == "Unknown")&F.col("city").isin(ca_cities_list),"Canada").otherwise(F.col("country")))\
.withColumn("country", F.when((F.col("country") == "Unknown")&F.col("state").isin(ca_cities_list),"Canada").otherwise(F.col("country")))

df=df.withColumn("country", F.when((F.col("country") == "Unknown")&F.col("state").isin(uk_cities_list),"United Kingdom").otherwise(F.col("country")))\
.withColumn("country", F.when((F.col("country") == "Unknown")&F.col("city").isin(uk_cities_list),"United Kingdom").otherwise(F.col("country")))

In [None]:
df.coalesce(1).write.option("header" ,"True").csv("/content/drive/MyDrive/spark/users_file1.csv","overwrite",header =True)

In [None]:
#reading csv file to dataframe
spark = SparkSession.builder.appName("users").getOrCreate()

df = spark.read.options(delimiter=',')\
  .csv("/content/drive/MyDrive/spark/users_file1.csv/part*.csv", header= True)
df.printSchema()
df.show()

root
 |-- User_ID: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- country: string (nullable = true)

+-------+----+--------------+---------------+--------------+
|User_ID| Age|          city|          state|       country|
+-------+----+--------------+---------------+--------------+
|      1|null|           Nyc|       New York| United States|
|      2|  18|      Stockton|     California| United States|
|      3|null|        Moscow|Yukon Territory|        Russia|
|      4|  17|         Porto|        Unknown|      Portugal|
|      5|null|   Farnborough|          Hants|United Kingdom|
|      6|  61|  Santa Monica|     California| United States|
|      7|null|    Washington|             Dc| United States|
|      8|null|       Timmins|        Ontario|        Canada|
|      9|null|    Germantown|      Tennessee| United States|
|     10|  26|      Albacete|      Wisconsin|         Spain|
|     11|  14| 

In [None]:
df=df.withColumn("country", F.when((F.col("country") == "Unknown")&F.col("city").isin(de_cities_list),"Germany").otherwise(F.col("country")))\
.withColumn("country", F.when((F.col("country") == "Unknown")&F.col("state").isin(de_cities_list),"Germany").otherwise(F.col("country")))\
.withColumn("country", F.when((F.col("country") == "Unknown")&F.col("city").isin(au_cities_list),"Australia").otherwise(F.col("country")))\
.withColumn("country", F.when((F.col("country") == "Unknown")&F.col("state").isin(au_cities_list),"Australia").otherwise(F.col("country")))\
.withColumn("country", F.when((F.col("country") == "Unknown")&F.col("city").isin(spain_list),"Spain").otherwise(F.col("country")))\
.withColumn("country", F.when((F.col("country") == "Unknown")&F.col("state").isin(spain_list),"Spain").otherwise(F.col("country")))

In [None]:
df.coalesce(1).write.option("header" ,"True").csv("/content/drive/MyDrive/spark/users_file2.csv","overwrite",header =True)

In [None]:
#reading csv file to dataframe
spark = SparkSession.builder.appName("users").getOrCreate()

df = spark.read.options(delimiter=',')\
  .csv("/content/drive/MyDrive/spark/users_file2.csv/part*.csv", header= True)
df.printSchema()
df.show()

root
 |-- User_ID: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- country: string (nullable = true)

+-------+----+--------------+---------------+--------------+
|User_ID| Age|          city|          state|       country|
+-------+----+--------------+---------------+--------------+
|      1|null|           Nyc|       New York| United States|
|      2|  18|      Stockton|     California| United States|
|      3|null|        Moscow|Yukon Territory|        Russia|
|      4|  17|         Porto|        Unknown|      Portugal|
|      5|null|   Farnborough|          Hants|United Kingdom|
|      6|  61|  Santa Monica|     California| United States|
|      7|null|    Washington|             Dc| United States|
|      8|null|       Timmins|        Ontario|        Canada|
|      9|null|    Germantown|      Tennessee| United States|
|     10|  26|      Albacete|      Wisconsin|         Spain|
|     11|  14| 

In [None]:
df=df.withColumn("country", F.when((F.col("country") == "Unknown")&F.col("city").isin(italy_list),"Italy").otherwise(F.col("country")))\
.withColumn("country", F.when((F.col("country") == "Unknown")&F.col("state").isin(italy_list),"Italy").otherwise(F.col("country")))\
.withColumn("country", F.when((F.col("country") == "Unknown")&F.col("city").isin(france_list),"france").otherwise(F.col("country")))\
.withColumn("country", F.when((F.col("country") == "Unknown")&F.col("state").isin(france_list),"france").otherwise(F.col("country")))

In [None]:
df = df.withColumn("country",  F.when((F.col("country") == "Unknown")& F.col("city").isin(cities_list),"United States").otherwise(F.col("country")))\
.withColumn("country", F.when((F.col("country") == "Unknown")&F.col("state").isin(cities_list),"United States").otherwise(F.col("country")))

In [None]:
df.groupBy("country").count().orderBy(F.col("count").desc()).show(200)

+--------------------+------+
|             country| count|
+--------------------+------+
|       United States|142689|
|              Canada| 22049|
|      United Kingdom| 18985|
|             Germany| 17136|
|               Spain| 13432|
|           Australia| 11900|
|               Italy| 11581|
|              France|  3484|
|            Portugal|  3372|
|         New Zealand|  3100|
|         Netherlands|  3046|
|         Switzerland|  1760|
|              Brazil|  1677|
|               China|  1482|
|              Sweden|  1454|
|               India|  1284|
|             Unknown|  1198|
|             Austria|  1148|
|            Malaysia|  1102|
|           Argentina|  1081|
|             Finland|   942|
|           Singapore|   934|
|             Denmark|   856|
|              Mexico|   827|
|             Belgium|   820|
|             Ireland|   755|
|         Philippines|   725|
|              Turkey|   497|
|              Poland|   458|
|            Pakistan|   436|
|         

In [None]:
# if age is bigger then 100 or smaller then 6 will be set to -1 as all the Unknown
df =df.withColumn("Age", F.when(F.col("Age").isNull(), -1).otherwise(F.col("Age")))\
.withColumn("Age", F.when(F.col("Age")> 100, -1).otherwise(F.col("Age")))\
.withColumn("Age", F.when(F.col("Age")< 6, -1).otherwise(F.col("Age")))
df.show()

+-------+---+--------------+---------------+--------------+
|User_ID|Age|          city|          state|       country|
+-------+---+--------------+---------------+--------------+
|      1| -1|           Nyc|       New York| United States|
|      2| 18|      Stockton|     California| United States|
|      3| -1|        Moscow|Yukon Territory|        Russia|
|      4| 17|         Porto|        Unknown|      Portugal|
|      5| -1|   Farnborough|          Hants|United Kingdom|
|      6| 61|  Santa Monica|     California| United States|
|      7| -1|    Washington|             Dc| United States|
|      8| -1|       Timmins|        Ontario|        Canada|
|      9| -1|    Germantown|      Tennessee| United States|
|     10| 26|      Albacete|      Wisconsin|         Spain|
|     11| 14|     Melbourne|       Victoria|     Australia|
|     12| -1|    Fort Bragg|     California| United States|
|     13| 26|     Barcelona|      Barcelona|         Spain|
|     14| -1|    Mediapolis|           I

In [None]:
from pyspark.sql.types import IntegerType , StringType

#changing the data type of the columns
df = df.withColumn("User_ID", F.col("User_ID").cast(IntegerType()))\
.withColumn("Age", F.col("Age").cast(IntegerType()))

In [None]:
df = df.withColumn("AgeGroups",F.lit("Unknown"))

In [None]:
df =df.withColumn("AgeGroups", F.when((F.col("Age") <12) & (F.col("Age")> 0), 'Children').otherwise(F.col("AgeGroups")))\
.withColumn("AgeGroups", F.when((F.col("Age") >=12) & (F.col("Age") <20), 'Teen').otherwise(F.col("AgeGroups")))\
.withColumn("AgeGroups", F.when((F.col("Age") >=20) & (F.col("Age") <30), 'YoungAdults').otherwise(F.col("AgeGroups")))\
.withColumn("AgeGroups", F.when((F.col("Age") >=30) & (F.col("Age") <65), 'Adults').otherwise(F.col("AgeGroups")))\
.withColumn("AgeGroups", F.when(F.col("Age") >=65, 'Senior').otherwise(F.col("AgeGroups")))

In [None]:
df.show()

+-------+---+--------------+---------------+--------------+-----------+
|User_ID|Age|          city|          state|       country|  AgeGroups|
+-------+---+--------------+---------------+--------------+-----------+
|      1| -1|           Nyc|       New York| United States|    Unknown|
|      2| 18|      Stockton|     California| United States|       Teen|
|      3| -1|        Moscow|Yukon Territory|        Russia|    Unknown|
|      4| 17|         Porto|        Unknown|      Portugal|       Teen|
|      5| -1|   Farnborough|          Hants|United Kingdom|    Unknown|
|      6| 61|  Santa Monica|     California| United States|     Adults|
|      7| -1|    Washington|             Dc| United States|    Unknown|
|      8| -1|       Timmins|        Ontario|        Canada|    Unknown|
|      9| -1|    Germantown|      Tennessee| United States|    Unknown|
|     10| 26|      Albacete|      Wisconsin|         Spain|YoungAdults|
|     11| 14|     Melbourne|       Victoria|     Australia|     

In [None]:
df.coalesce(1).write.option("header" ,"True").csv("/content/drive/MyDrive/spark/users_file3.csv","overwrite",header =True)

In [None]:
# Stop the session --> similar to saving our doings in spark layer
spark.stop()

In [None]:
# generate a df of all the rows where the country is 'Unknown
df_unknown = df.filter((F.col("country") == 'Unknown'))
# creating a list of cities and states
list_of_cities = []
for row in df_unknown.collect():
    list_of_cities.append(row.city)
    list_of_cities.append(row.state)
list_of_cities = list(set(list_of_cities))
len(list_of_cities)    

In [None]:
# creating a dictionery of 
import requests
import re

city_list = list_of_cities
city_country_dict = {}
country_city_dict = {}
for city in list_of_cities:
  try:
    response = requests.request("GET", f"https://www.geonames.org/search.html?q={city}&country=")
    country = re.findall("/countries.*\.html", response.text)[0].split("/")[-1].split(".")[0].capitalize()
    if country not in country_city_dict:
        country_city_dict[country] = [city]
    else:
        country_city_dict[country].append(city)
    city_country_dict[city] = country
  except:
    pass



In [None]:
country_city_dict


In [None]:
city_country_dict

In [None]:
# generate a df of all the rows where the country is 'Unknown
df_unknown = df.filter((F.col("country") == 'Unknown')&((F.col("state") != 'Unknown')|(F.col("city") != 'Unknown')))

# creating a list of states
list_of_states = []
for row in df_unknown.collect():
    list_of_states.append(row.state)

list_of_states = list(set(list_of_states))
len(list_of_states)    

In [None]:
# creating a dictionery of 
import requests
import re

city_list = list_of_cities
city_country_dict = {}
country_city_dict = {}
for city in list_of_cities:
  try:
    response = requests.request("GET", f"https://www.geonames.org/search.html?q={city}&country=")
    country = re.findall("/countries.*\.html", response.text)[0].split("/")[-1].split(".")[0].capitalize()
    if country not in country_city_dict:
        country_city_dict[country] = [city]
    else:
        country_city_dict[country].append(city)
    city_country_dict[city] = country
  except:
    pass


In [None]:
# saving the dictionary 
import csv
my_dict = city_country_dict
with open('city_country_dict.csv', 'w') as f:
    for key in my_dict.keys():
        f.write("%s,%s\n"%(key,my_dict[key]))


In [None]:
# saving the dictionary 
import csv
my_dict = country_city_dict
with open('country_city_dict.csv', 'w') as f:
    for key in my_dict.keys():
        f.write("%s,%s\n"%(key,my_dict[key]))

In [None]:
# reading the dictionary file
city_country = spark.read.csv("/content/drive/MyDrive/spark/city_country_dict.csv",header=True, inferSchema=True)

#creating a dictionary of city and country from city_country_dict
city_country_dict = {}
for row in city_country.collect():
    city_country_dict[row[0]] = row[1]