# Cleaning countries

In [1]:
#imports
import re
import pandas as pd
import numpy as np
import scipy as sp
import scipy.stats as stats
import matplotlib.pyplot as plt
from datetime import timedelta

In [2]:
import findspark
findspark.init()
import pyspark

from functools import reduce
from pyspark.sql import *
from pyspark.sql import functions as F
from pyspark.sql import SQLContext
from pyspark.sql.functions import *
from pyspark.sql.functions import min
from pyspark.sql.functions import to_date, last_day,date_add
from datetime import timedelta

spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext

In [36]:
DATA_FOLDER = 'data'
openfood_file = "/en.openfoodfacts.org.products.csv"
cities_file = "/worldcitiespop.csv"
countries_file = "/GEODATASOURCE-COUNTRY.csv"

# Loading data

In [4]:
dataset_main = spark.read.csv(DATA_FOLDER+ openfood_file, header=True, mode="DROPMALFORMED", sep = '\t')

dataset_main.createOrReplaceTempView("data_main")

# Filter required columns
p_id_col = " code, "
general_cols = " brands, brands_tags, categories, categories_tags, origins, origins_tags, manufacturing_places, manufacturing_places_tags,labels,labels_tags,emb_codes,emb_codes_tags,first_packaging_code_geo,cities,cities_tags,purchase_places,stores,countries,countries_tags "
geo_cols = " origins, manufacturing_places, countries "
geo_tags_cols = " origins_tags, manufacturing_places_tags, countries_tags "

off_df = spark.sql("SELECT" + p_id_col + geo_cols + "," + geo_tags_cols + " FROM data_main")
off_df.printSchema()

root
 |-- code: string (nullable = true)
 |-- origins: string (nullable = true)
 |-- manufacturing_places: string (nullable = true)
 |-- countries: string (nullable = true)
 |-- origins_tags: string (nullable = true)
 |-- manufacturing_places_tags: string (nullable = true)
 |-- countries_tags: string (nullable = true)



In [5]:
off_all_size = off_df.count()
off_cols_size = len(off_df.columns)
print("All data Size:\n" + str(off_cols_size) + "(columns) * " + str(off_all_size) + "(rows)")

All data Size:
7(columns) * 693829(rows)


### Data Cleaning and Preprocessing

In [6]:
# Find number of missing data

off_df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in off_df.columns]).show()

+----+-------+--------------------+---------+------------+-------------------------+--------------+
|code|origins|manufacturing_places|countries|origins_tags|manufacturing_places_tags|countries_tags|
+----+-------+--------------------+---------+------------+-------------------------+--------------+
|   0| 651635|              626848|      561|      651689|                   626868|           561|
+----+-------+--------------------+---------+------------+-------------------------+--------------+



In [7]:
off_df.createOrReplaceTempView("off_df")

sql_filter = "SELECT * FROM off_df WHERE countries is not NULL \
              AND countries_tags is not NULL \
             AND origins is not NULL AND origins_tags is not NULL AND\
             manufacturing_places is not NULL AND manufacturing_places_tags is not NULL "

off_p_df = spark.sql(sql_filter)
off_p_all_size = off_p_df.count()
off_p_cols_size = len(off_p_df.columns)
print("Full GEO information data Size:\n" + str(off_p_cols_size) + "(columns) * " + str(off_p_all_size) + "(rows)")

Full GEO information data Size:
7(columns) * 26953(rows)


In [None]:
off_p_df.show(30)

Since columns with _tag label have more consistent data, we will use these columns from now.

In [8]:
off_p_df = off_p_df.drop('countries').drop('manufacturing_places').drop('origins')

In [None]:
off_p_df.show(10)

In [9]:
# Explode data

countries_df = off_p_df.withColumn('origins_tags', F.explode_outer(F.split('origins_tags', ',')))\
.withColumn('manufacturing_places_tags', F.explode_outer(F.split('manufacturing_places_tags', ',')))\
.withColumn('countries_tags', F.explode_outer(F.split('countries_tags', ',')))

In [10]:
# Remove "en:" occurances before name of each country in coutries_tags
countries_df = countries_df.withColumn('countries_tags', F.regexp_replace('countries_tags', "en:", ""))

In [11]:
countries_df.show(10)

+-------------+------------+-------------------------+--------------+
|         code|origins_tags|manufacturing_places_tags|countries_tags|
+-------------+------------+-------------------------+--------------+
|0000000274722|      france|                   france|        france|
|0000000290616|      quebec|          brossard-quebec|        canada|
|0000000394710|      quebec|          brossard-quebec|        canada|
|0000001071894|      france|           united-kingdom|united-kingdom|
|0000001938067|      quebec|          brossard-quebec|        canada|
|0000004302544|      quebec|                 brossard|        canada|
|0000004302544|      quebec|                   quebec|        canada|
|0000008237798|      quebec|                 brossard|        canada|
|0000008237798|      quebec|                   quebec|        canada|
|0000008240095|      quebec|          brossard-quebec|        canada|
+-------------+------------+-------------------------+--------------+
only showing top 10 

In [25]:
countries_mapping = countries_df.toPandas()
countries_mapping.head()

In [29]:
# create a new database mapping each country to some labels

countries_mapping['all_countries'] = countries_mapping.origins_tags + "," + countries_mapping.manufacturing_places_tags +"," + countries_mapping.manufacturing_places_tags

In [160]:
countries = pd.concat([pd.Series(row['all_countries'].split(','))              
                    for _, row in countries_mapping.iterrows()]).reset_index(drop=True)

In [161]:
countries = countries.drop_duplicates().reset_index(drop=True)
countries = countries.str.replace("-", " ") 

# Remove numbers from name of countries
countries = countries.str.replace('\d+', '')

print(len(countries))

8469


In [162]:
countries.head()

0             france
1             quebec
2    brossard quebec
3     united kingdom
4           brossard
dtype: object

## Using country name

An external database was used for country names. This database maps each country with country code
https://www.geodatasource.com

In [163]:
dataset_countries = pd.read_csv(DATA_FOLDER+ countries_file,sep='\t', error_bad_lines=False)
dataset_countries.head()

Unnamed: 0,CC_FIPS,CC_ISO,TLD,COUNTRY_NAME
0,AA,AW,.aw,Aruba
1,AC,AG,.ag,Antigua and Barbuda
2,AE,AE,.ae,United Arab Emirates
3,AF,AF,.af,Afghanistan
4,AG,DZ,.dz,Algeria


In [164]:
def map_country(data, country_code):
    global map_countries
    map_countries = map_countries.append({'input': data, 'country_code': country_code}, ignore_index=True)

def find_country(data):
    output = dataset_countries[dataset_countries.COUNTRY_NAME.str.contains(data, case=False)]
    if len(output):
        return output.iloc[0].CC_FIPS
    return 0

def assign_country_code(row):
    output = find_country(row)
    if output:
        map_country(row, output)
        return True
    return False

In [165]:
map_countries = pd.DataFrame(columns=['input', 'country_code'])

for i in range(len(countries)):
    if assign_country_code(countries[i]):
        countries = countries.drop([i])
        i -=1

195 name of countries detected in uncleaned dataset.

## Using City names

An external database was used for city names. This database maps each city with country code

https://www.maxmind.com/en/free-world-cities-database

In [168]:
# Find name of cities and replace with country code (Here some bias may happen, some cities have similar name)

dataset_cities = pd.read_csv(DATA_FOLDER+ cities_file,sep=',', error_bad_lines=False, encoding = "utf-8")
dataset_cities = dataset_cities[['Country', 'City']]
dataset_cities.head()

  interactivity=interactivity, compiler=compiler, result=result)


Unnamed: 0,Country,City
0,ad,aixas
1,ad,aixirivali
2,ad,aixirivall
3,ad,aixirvall
4,ad,aixovall


In [169]:
def find_city(data):
    output = dataset_cities[dataset_cities.City.str.contains(data, case=False, na=False)]
    if len(output):
        return output.iloc[0].Country
    return 0

def assign_country_code_using_city(row):
    output = find_city(row)
    if output:
        map_country(row, output)
        return True
    return False

In [212]:
seen = 0

In [218]:
# Try to map more locations using city names
countries = countries.reset_index(drop=True)
for i in range(seen,2000):
    if assign_country_code_using_city(countries[i]):
        countries = countries.drop([i])
        i -=1
        seen = i
print(seen)   

1900


2319 locations were matched with city names

## Using country name (contain)

In [228]:
for index, row in dataset_countries.iterrows():
    output = countries[countries.str.contains(row.COUNTRY_NAME, case=False, na=False)]
    for i in range(len(output)):
        map_country(output.iloc[i], row.CC_FIPS)
        countries = countries.drop(countries[countries == output.iloc[i]].index[0])

  


629 locations were matched with city names

## Using City name (contain)

In [263]:
for j in range(len(dataset_cities)):
    output = countries[countries.str.contains(str(dataset_cities.iloc[j].City) + " ", case=False, na=False)]
    for i in range(len(output)):
        map_country(output.iloc[i], dataset_cities.iloc[j].Country)
        countries = countries.drop(countries[countries == output.iloc[i]].index[0])
        
    output = countries[countries.str.contains(" " + str(dataset_cities.iloc[j].City), case=False, na=False)]
    for i in range(len(output)):
        map_country(output.iloc[i], dataset_cities.iloc[j].Country)
        countries = countries.drop(countries[countries == output.iloc[i]].index[0])

5,326 name of countries contain name of one city.

In [273]:
print("{0} strings remained\n{1} strings mapped to a country".format(len(countries), len(map_countries)))

1318 strings remained
7151 strings mapped to a country


### In continue:
   
    
    