# THE GOAL

The goal is to take Watson's role and using the intel (the data in the supplied files) from the police, Interpol, and undercover agents about Europe's criminals to identify the name behind which Moriarty is hiding. 


# SOLUTION

# PART 1
-Watson, just like our grand-grand-fathers we are again after Moriarty. 

We need to catch him. H-mmm... I need to be careful here - maybe it is not him, maybe it is her. All we know is 
that someone is masterminding unlawful activities and planning something bad. The Interpol agents, with the help of my boys, collected information that should provide us the clues to determine the name Moriarty's is hiding brhind and arrest him.

-I have a number of .csv and .txt files about criminal activity and high-profile suspicious sales that were sent over from our neighbors: France, Germany, Netherlands, and our own MI-6 in the United Kingdom.

So, the first task would be to combine the data into one table. I requested info on the name, alias, and the location of the last known whereabouts, as latitude and longitude, but since the data comes from all around the Europe they might have named the columns differently.

I am thinking that adding the country to the data might be helpful in our future analysis.

Lastly, from my correspondence with our undercover agents, all the activity seems to be happening around major financial centers. If the city names are not in the data, I suppose you can extract it based on the latitude and logitude. Mmmm... And a map of course, unless your knowledge of Europe's geography is excepitonal. 





Text:
Tasks:
1. Read in data from the files into a separate dataframe and add the country name ('country' column).
2. Identify the city around which the criminals operate. Add it to the dataframe ('city' column).
3. Concatenate dfs into a single dataframe with the four original columns renamed to: [name, alias, latitude, longitude]
4. Fill NAs in aliases with an empty string.


In [6]:
import pyspark
print(pyspark.__version__)

3.0.1


In [7]:
from pyspark.sql import SparkSession

import pyspark.sql.functions as F

# from https://datascience.stackexchange.com/questions/11356/merging-multiple-data-frames-row-wise-in-pyspark
from functools import reduce  # For Python 3.x
from pyspark.sql import DataFrame

spark = SparkSession \
    .builder \
    .master("local") \
    .appName("Python Spark SQL basic example") \
    .config("spark.sql.warehouse.dir", "hdfs://namenode/sql/metadata/hive")\
    .enableHiveSupport()\
    .getOrCreate()
# .config("spark.sql.warehouse.dir", "hdfs://namenode/sql/metadata/hive") \

In [8]:
# select database to use and create tables in
spark.sql("show databases").show(10, False)
spark.sql("use default")  # 'default' is a pre-created database where we can create tables
# (we could have skipped this statement but it makes it more explicity which database we use)

+---------+
|namespace|
+---------+
|default  |
+---------+



DataFrame[]

In [9]:


# we could also create our tables in our own custom database using:
# spark.sql("create database moriarty_db")
# spark.sql("use moriarty_db")
spark.sql("show tables").show(10, False)

+--------+---------+-----------+
|database|tableName|isTemporary|
+--------+---------+-----------+
|default |criminals|false      |
|default |test     |false      |
+--------+---------+-----------+



In [10]:
%ls data

crime_type_profit_France.txt          criminals_Germany.csv
crime_type_profit_Germany.txt         criminals_Netherlands.csv
crime_type_profit_Netherlands.txt     criminals_United Kingdom.csv
crime_type_profit_United Kingdom.txt  id_dates.csv
criminals_France.csv


In [11]:
# get the data

def rename_cols(df, new_col_names):
    """"""
    for col, new_col in zip(df.columns, new_col_names):
        df = df.withColumnRenamed(col, new_col)
        
    return df

#explore the dataframes: column names, shapes and combine into a single dataframe
country_list = ["United Kingdom", "Germany", "Netherlands", "France"]
dfs_dict = {}
for country_ in country_list:
    file_name = "./data/criminals_{}.csv".format(country_)
    df = spark.read.csv(file_name, header=True, inferSchema=True)
    print("Country: {}, rows: {}".format(country_, df.count()))
    new_col_names = ["id", "name", "alias", "latitude", "longitude"]
    df = rename_cols(df, new_col_names)
    df = df.withColumn('country', F.lit(country_))
    dfs_dict[country_] = df  # add data frame to the dict for a future union
# print("Len dfs_dict: {}".format(len(dfs_dict)))



def unionAll(dfs):
    return reduce(DataFrame.unionAll, dfs)

df_criminals_combined = unionAll(list(dfs_dict.values()))
print("Rows in combined df: {}".format(df_criminals_combined.count()))

Country: United Kingdom, rows: 306
Country: Germany, rows: 264
Country: Netherlands, rows: 250
Country: France, rows: 349
Rows in combined df: 1169


In [13]:
# save as ORC file (a popular data format in big data management)
df_criminals_combined.cache()
df_criminals_combined.coalesce(1).write.orc("./sql_data/criminals", mode='overwrite') # doesn't have 'orc' extension as it is a folder
# the file with extension '.orc' will be inside it

In [14]:
# check schema before casting and saving
df_criminals_combined.printSchema()

root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- alias: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- country: string (nullable = false)



In [21]:
# often to insure that datatypes are compatible for retrieval via Hive we need to explicitly define data types

# import data types to cast data and define schema
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType, \
                                DecimalType, FloatType

schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("name", StringType(), True),
    StructField("alias", StringType(), True),
    StructField("latitude", FloatType(), True),
    StructField("longitude", FloatType(), True),
    StructField("country", StringType(), True)
])

# apply schema
df_criminals_combined_new_schema = spark.createDataFrame(df_criminals_combined.rdd, schema=schema)
df_criminals_combined_new_schema.printSchema()

root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- alias: string (nullable = true)
 |-- latitude: float (nullable = true)
 |-- longitude: float (nullable = true)
 |-- country: string (nullable = true)



In [17]:
# create a table that defines the data (including the data location)

# drop table if we created it before and define the table by registering 'table definition'
spark.sql("drop table if exists criminals")

# the hive datatypes should be appropriate (not necessarily identically named to spark datatypes)

# 'EXTERNAL' (also could be 'external') makes sure that if the table is dropped (deleted) the data remains
spark.sql("""CREATE EXTERNAL TABLE criminals (
 id int,
 name string,
 alias string,
 latitude float,
 longitude float,
 country string
    )
STORED AS ORC
LOCATION '/Users/vk/Documents/Python/holmes_moriarty_sql/src/sql_data/criminals'
"""
)

DataFrame[]

In [18]:
spark.sql("show tables").show()

+--------+---------+-----------+
|database|tableName|isTemporary|
+--------+---------+-----------+
| default|criminals|      false|
| default|     test|      false|
+--------+---------+-----------+



In [19]:
criminals_df = spark.sql("select * from criminals")
# print("Table (read-in) count: {}".format(criminals_df.count()))
criminals_df.show(10, False)

+---+------------------------+-----+--------+---------+--------------+
|id |name                    |alias|latitude|longitude|country       |
+---+------------------------+-----+--------+---------+--------------+
|0  |Ms. Diane Barnett       |null |51.3327 |-0.0328  |United Kingdom|
|1  |Elizabeth McDonald      |null |51.3732 |-0.0396  |United Kingdom|
|2  |Jacqueline Martin-Winter|null |51.3536 |-0.223   |United Kingdom|
|3  |Roger Farmer            |null |51.2891 |-0.208   |United Kingdom|
|4  |Mrs. Georgina Harrison  |null |51.6004 |0.0054   |United Kingdom|
|5  |Peter Stevens           |null |51.6441 |0.0188   |United Kingdom|
|6  |Georgina Bell           |null |51.5304 |-0.0927  |United Kingdom|
|7  |Miss Lesley Sullivan    |null |51.7303 |-0.2607  |United Kingdom|
|8  |Keith Kelly             |Happy|51.4393 |-0.1421  |United Kingdom|
|9  |Shane Bailey            |null |51.2735 |-0.3407  |United Kingdom|
+---+------------------------+-----+--------+---------+--------------+
only s

In [20]:
criminals_df.printSchema()

root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- alias: string (nullable = true)
 |-- latitude: float (nullable = true)
 |-- longitude: float (nullable = true)
 |-- country: string (nullable = true)



In [16]:
# calculate mean latitude and longitude to identify the major financial centers (cities)
# (copy and paste the lat, lon values into Google Maps)
# dataframe.filter(df['salary'] > 100000).agg({"age": "avg"})

spark.sql("""select country, AVG(latitude) as avg_lat, AVG(longitude) as avg_lon
                            from criminals
                             group by country
                             order by country""").show(10, False)
# # print("Table (read-in) count: {}".format(criminals_df.count()))
# criminals_df.show(10, False)

# for country_ in country_list:
#     country_df = df_criminals_combined.where("country = '{}'".format(country_))
#     lat = round(country_df.agg({"latitude": "avg"}).collect()[0][0], 4)
#     lon = round(country_df.agg({"longitude": "avg"}).collect()[0][0], 4)
#     print("Country: {}, (lat, lon): {}, {}".format(country_, lat, lon))
#     print(40 * "*")

+--------------+------------------+--------------------+
|country       |avg_lat           |avg_lon             |
+--------------+------------------+--------------------+
|France        |48.86060943166301 |2.364566761989648   |
|Germany       |50.097135254831024|8.678964380061988   |
|Netherlands   |52.37530560302734 |4.900951610565185   |
|United Kingdom|51.50456336276983 |-0.12400588218790493|
+--------------+------------------+--------------------+



In [17]:
spark.sql("""select country,
                    ROUND(AVG(latitude), 4) as avg_lat,
                    ROUND(AVG(longitude), 4) as avg_lon
                from criminals
                 group by country
                 order by country""").show(10, False)

+--------------+-------+-------+
|country       |avg_lat|avg_lon|
+--------------+-------+-------+
|France        |48.8606|2.3646 |
|Germany       |50.0971|8.679  |
|Netherlands   |52.3753|4.901  |
|United Kingdom|51.5046|-0.124 |
+--------------+-------+-------+



In [None]:
# add the city name to the df

#it can be done using a series of if/else statements, such as 'if country_ == 'France': city = 'Paris', etc. OR
# using a dictionary as below:
country_city_dict = {"United Kingdom": "London", "Germany": "Frankfurt", "Netherlands": "Amsterdam", "France": "Paris"}
country_city_dict


In [21]:
spark.sql("""select *, 
                case 
                    when country = 'United Kingdom' then 'London'
                    when country = 'France' then 'Paris'
                    when country = 'Germany' then 'Frankfurt'
                    when country = 'Netherlands' then 'Amsterdam'
                end as city
            from criminals""").show(10, False)

+---+------------------------+-----+--------+---------+--------------+------+
|id |name                    |alias|latitude|longitude|country       |city  |
+---+------------------------+-----+--------+---------+--------------+------+
|0  |Ms. Diane Barnett       |null |51.3327 |-0.0328  |United Kingdom|London|
|1  |Elizabeth McDonald      |null |51.3732 |-0.0396  |United Kingdom|London|
|2  |Jacqueline Martin-Winter|null |51.3536 |-0.223   |United Kingdom|London|
|3  |Roger Farmer            |null |51.2891 |-0.208   |United Kingdom|London|
|4  |Mrs. Georgina Harrison  |null |51.6004 |0.0054   |United Kingdom|London|
|5  |Peter Stevens           |null |51.6441 |0.0188   |United Kingdom|London|
|6  |Georgina Bell           |null |51.5304 |-0.0927  |United Kingdom|London|
|7  |Miss Lesley Sullivan    |null |51.7303 |-0.2607  |United Kingdom|London|
|8  |Keith Kelly             |Happy|51.4393 |-0.1421  |United Kingdom|London|
|9  |Shane Bailey            |null |51.2735 |-0.3407  |United Ki

In [29]:
# fill Nas with empty string 
# we'll also assign this new data to a variable name for saving and creating a new table to use later
# (note that 'show' method is moved to the spark dataframe)
criminals_with_city = spark.sql("""select id, name,
                case
                    when alias is null then ''
                    else alias
                end as alias,
                country,
                case 
                    when country = 'United Kingdom' then 'London'
                    when country = 'France' then 'Paris'
                    when country = 'Germany' then 'Frankfurt'
                    when country = 'Netherlands' then 'Amsterdam'
                end as city
            from criminals""")

criminals_with_city.show(10, False)

+---+------------------------+-----+--------------+------+
|id |name                    |alias|country       |city  |
+---+------------------------+-----+--------------+------+
|0  |Ms. Diane Barnett       |     |United Kingdom|London|
|1  |Elizabeth McDonald      |     |United Kingdom|London|
|2  |Jacqueline Martin-Winter|     |United Kingdom|London|
|3  |Roger Farmer            |     |United Kingdom|London|
|4  |Mrs. Georgina Harrison  |     |United Kingdom|London|
|5  |Peter Stevens           |     |United Kingdom|London|
|6  |Georgina Bell           |     |United Kingdom|London|
|7  |Miss Lesley Sullivan    |     |United Kingdom|London|
|8  |Keith Kelly             |Happy|United Kingdom|London|
|9  |Shane Bailey            |     |United Kingdom|London|
+---+------------------------+-----+--------------+------+
only showing top 10 rows



In [23]:
criminals_with_city.printSchema()

root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- alias: string (nullable = true)
 |-- country: string (nullable = true)
 |-- city: string (nullable = true)



In [30]:
criminals_with_city.cache()
criminals_with_city.coalesce(1).write.orc("./sql_data/criminals_with_city", mode='overwrite') # doesn't have 'orc' extension as it is a folder
# the file with extension '.orc' will be inside it

In [35]:
spark.sql("drop table if exists criminals_with_city")

spark.sql("""CREATE EXTERNAL TABLE criminals_with_city (
 id int,
 name string,
 alias string,
 country string,
 city string)
STORED AS ORC
LOCATION '/Users/vk/Documents/Python/holmes_moriarty_sql/src/sql_data/criminals_with_city'
"""
)


DataFrame[]

In [36]:
# check that the data is readable
spark.sql("select * from criminals_with_city").show(10, False)

+---+------------------------+-----+--------------+------+
|id |name                    |alias|country       |city  |
+---+------------------------+-----+--------------+------+
|0  |Ms. Diane Barnett       |     |United Kingdom|London|
|1  |Elizabeth McDonald      |     |United Kingdom|London|
|2  |Jacqueline Martin-Winter|     |United Kingdom|London|
|3  |Roger Farmer            |     |United Kingdom|London|
|4  |Mrs. Georgina Harrison  |     |United Kingdom|London|
|5  |Peter Stevens           |     |United Kingdom|London|
|6  |Georgina Bell           |     |United Kingdom|London|
|7  |Miss Lesley Sullivan    |     |United Kingdom|London|
|8  |Keith Kelly             |Happy|United Kingdom|London|
|9  |Shane Bailey            |     |United Kingdom|London|
+---+------------------------+-----+--------------+------+
only showing top 10 rows



# Task 2
Add crime_type and profit info to criminals. 
#(merge/join) criminals table with the crime type and profit information.

- Great, Watson! 
- Now we need to know what everyone of those supspects did wrong, that is the crime type, and desirably, how much they profited from it: Moriarty is not a small fish. He is in the category with th largest total sales.

- You'll need to add the crime type and the profit from the files to the table you already put together. Be mindful of the file types. I also believe that the separator in these file maybe different from the files you used previously.
-Moriarty made one of the top 5 sales last year. He is not stupid for nicknames, I am pretty sure he doesn't have an alias.


# Solution (task 2)

In [37]:
# union(concatenate) files for the latest crime dates

country_list = ["United Kingdom", "Germany", "Netherlands", "France"]
dfs_dict = {}
for country_ in country_list:
    file_name = "./data/crime_type_profit_{}.txt".format(country_)
    df = spark.read.csv(file_name, header=True, sep=" ")
    print("rows: {}".format(df.count()))
    df = df.withColumn('country', F.lit(country_))
    dfs_dict[country_] = df
print("Len dfs_dict: {}".format(len(dfs_dict)))

#combine all dataframes into one
df_crime_type_profit = unionAll(list(dfs_dict.values()))
print(list(df_crime_type_profit.columns))
df_crime_type_profit.show(4, False)


rows: 306
rows: 264
rows: 250
rows: 349
Len dfs_dict: 4
['name', 'crime_type', 'profit', 'country']
+------------------------+----------+------+--------------+
|name                    |crime_type|profit|country       |
+------------------------+----------+------+--------------+
|Ms. Diane Barnett       |theft     |284   |United Kingdom|
|Elizabeth McDonald      |theft     |59    |United Kingdom|
|Jacqueline Martin-Winter|forgery   |150   |United Kingdom|
|Roger Farmer            |theft     |378   |United Kingdom|
+------------------------+----------+------+--------------+
only showing top 4 rows



In [38]:
df_crime_type_profit.printSchema()

root
 |-- name: string (nullable = true)
 |-- crime_type: string (nullable = true)
 |-- profit: string (nullable = true)
 |-- country: string (nullable = false)



In [None]:
# profit is a string - which is incorrect -> the schema needs to be redifined
# or the column recasted

In [40]:
df_crime_type_profit = df_crime_type_profit.withColumn('profit', F.col('profit').cast(IntegerType()))

In [41]:
df_crime_type_profit.printSchema()

root
 |-- name: string (nullable = true)
 |-- crime_type: string (nullable = true)
 |-- profit: integer (nullable = true)
 |-- country: string (nullable = false)



In [44]:
df_crime_type_profit.cache()
df_crime_type_profit.coalesce(1).write.orc("./sql_data/crime_profit", mode="overwrite")

In [47]:
spark.sql("drop table if exists crime_profit")

spark.sql("""CREATE EXTERNAL TABLE crime_profit (
  name string,
  crime_type string,
  profit int,
  country string)
STORED AS ORC
LOCATION '/Users/vk/Documents/Python/holmes_moriarty_sql/src/sql_data/crime_profit'
"""
)

DataFrame[]

In [48]:
spark.sql("select * from crime_profit").show(10, False)

+------------------------+----------+------+--------------+
|name                    |crime_type|profit|country       |
+------------------------+----------+------+--------------+
|Ms. Diane Barnett       |theft     |284   |United Kingdom|
|Elizabeth McDonald      |theft     |59    |United Kingdom|
|Jacqueline Martin-Winter|forgery   |150   |United Kingdom|
|Roger Farmer            |theft     |378   |United Kingdom|
|Mrs. Georgina Harrison  |theft     |55    |United Kingdom|
|Peter Stevens           |robbery   |868   |United Kingdom|
|Georgina Bell           |theft     |365   |United Kingdom|
|Miss Lesley Sullivan    |forgery   |320   |United Kingdom|
|Keith Kelly             |theft     |399   |United Kingdom|
|Shane Bailey            |forgery   |495   |United Kingdom|
+------------------------+----------+------+--------------+
only showing top 10 rows



In [None]:
spark.

In [49]:
spark.sql("""select * from criminals a
          left join crime_profit b
          on a.name = b.name and a.country = b.country""").show(10, False)

+---+------------------------+-----+--------+---------+--------------+------------------------+----------+------+--------------+
|id |name                    |alias|latitude|longitude|country       |name                    |crime_type|profit|country       |
+---+------------------------+-----+--------+---------+--------------+------------------------+----------+------+--------------+
|0  |Ms. Diane Barnett       |null |51.3327 |-0.0328  |United Kingdom|Ms. Diane Barnett       |theft     |284   |United Kingdom|
|1  |Elizabeth McDonald      |null |51.3732 |-0.0396  |United Kingdom|Elizabeth McDonald      |theft     |59    |United Kingdom|
|2  |Jacqueline Martin-Winter|null |51.3536 |-0.223   |United Kingdom|Jacqueline Martin-Winter|forgery   |150   |United Kingdom|
|3  |Roger Farmer            |null |51.2891 |-0.208   |United Kingdom|Roger Farmer            |theft     |378   |United Kingdom|
|4  |Mrs. Georgina Harrison  |null |51.6004 |0.0054   |United Kingdom|Mrs. Georgina Harrison  |th

In [None]:

# drop duplicates 
df_with_city = df_with_city.drop_duplicates(["name"])
df_with_city.count()

In [None]:
# join main criminal info with crime type and profit
df_city_profit = df_with_city.join(df_crime_type_profit, ["name","country"], "left")
print("Df shape: {}".format(df_city_profit.count()))
# print(df_city_profit.columns)./
df_city_profit.orderBy('profit', ascending = False).show(10, False)

In [None]:
# profit column is not sorted properly. possibly the data type is the issue

In [None]:
df_city_profit.printSchema()

In [None]:
df_city_profit = df_city_profit.withColumn("profit", F.col("profit").cast("int"))
df_city_profit.printSchema()

In [None]:
# let's order by profit again...
df_city_profit.orderBy('profit', ascending = False).show(5, False)

In [None]:
#investigate crime types and get total sales for each
df_by_profit = df_city_profit.groupBy(["crime_type"]).\
                agg(F.sum("profit").alias("total_profit")).\
                orderBy("total_profit", ascending=False)

df_by_profit.show(10, False)

In [None]:
crime_type_big_sales = df_by_profit.select("crime_type").collect()[0][0]
crime_type_big_sales

In [None]:
print("crime_type = '{}'".format(crime_type_big_sales))

In [None]:
df_city_profit.columns

In [None]:
countries_crime_type_profit_df = df_city_profit.where("crime_type == '{}'".format(crime_type_big_sales))\
                    .groupBy(["country"])\
                    .agg(F.sum("profit").alias('total_profit'))\
                    .orderBy("total_profit", ascending=False)
    
countries_crime_type_profit_df.show(10, False)

In [None]:
top_country = countries_crime_type_profit_df.select("country").collect()[0][0]
top_country

In [None]:
df_city_profit.show(3)

In [None]:
# Show top 5 salesmen in the selected country
df_large_sales_alias_null = df_city_profit.where("country = '{}' and alias = '' and crime_type = '{}'".format(top_country, crime_type_big_sales))\
                                            .orderBy("profit", ascending = False)

df_large_sales_alias_null.show(5)

# PART 3

Add date (last deal date) Moriarty does not deal on Sundays

In [None]:
id_dates = spark.read.csv("./data/id_dates.csv", header=True, inferSchema=True)
print("id_dates shape: {}".format(id_dates.count()))
id_dates.show(4)

In [None]:
df_selected_with_dates = df_city_profit.join(id_dates, on=["id", "country"], how="left")
print(df_selected_with_dates.count())
df_selected_with_dates.show(3)

In [None]:
df_selected_with_dates.printSchema()

In [None]:
from pyspark.sql.functions import udf
from pyspark.sql.types import DateType, StringType


def weekday(date):
    """ Generate day of the week based on date (as string or as datetime object)"""
    
    if isinstance(date, str):
        from datetime import datetime
        
        date = datetime.strptime(date, "%Y-%m-%d")  # change the format if necessary
        
    return date.strftime("%A")


weekday_udf = udf(weekday, StringType())

# conversion to DateType is not necessary as it is handled inside the function
# here it is offered as an example of re-casting
df_selected_with_dates = df_selected_with_dates.withColumn("date", F.col("date").cast(DateType()))

df_selected_with_dates = df_selected_with_dates.withColumn("weekdate", weekday_udf("date").alias("weekday"))
df_selected_with_dates.show(10)

In [None]:
crime_type_big_sales

In [None]:
# Show top 5 salesmen in the selected country
df_final = df_selected_with_dates.where("""country = '{}' 
                                            and alias = '' 
                                            and crime_type = '{}'
                                            and weekday != 'Sunday'
                                       """.format(top_country, crime_type_big_sales))

df_final.show(5)

In [None]:
moriarty_name =  df_final.select("name").collect()[0][0]
print("The name Moriarty is hiding behind: {}".format(moriarty_name))