# Package Installation and Data Loading

The first step was to install and import some packages and modules that will be used later.

**Package installation:**

In [1]:
# pip install pyspark
# pip install pandas
# pip install "pymongo[srv]"

Note: you may need to restart the kernel to use updated packages.


**Import of the modules:**

In [3]:
import time
from functools import reduce
import pandas as pd
import pyspark
from pyspark.sql import SparkSession
from pymongo.mongo_client import MongoClient
from pymongo.server_api import ServerApi

After the import, the chosen datasets were associated with a variable that will be used throughout the work.

In [4]:
url1 = "https://raw.githubusercontent.com/CarlosGomes00/BigData/main/Big%20Data/death-rate-from-suicides-gho.csv"
url2 = "https://raw.githubusercontent.com/CarlosGomes00/BigData/main/Big%20Data/rates-from-drug-use-disorders.csv"
url3 = "https://raw.githubusercontent.com/CarlosGomes00/BigData/main/Big%20Data/total-alcohol-consumption-per-capita-litres-of-pure-alcohol.csv"
url4 = "https://raw.githubusercontent.com/CarlosGomes00/BigData/main/Big%20Data/burden-disease-from-each-mental-illness.csv"

# Pandas

Initially, we tested Pandas to process the data. Time was also used to see how long processing took.

In [5]:
# Reading csv files
file1 = pd.read_csv(url1)
file2 = pd.read_csv(url2)
file3 = pd.read_csv(url3)
file4 = pd.read_csv(url4)

In [75]:
# Starts counting the execution time
start = time.time()  

# List containing the DataFrames to be combined
Data_Frames = [file1, file2, file3, file4]

# Uses the reduce function to merge all the DataFrames in the Data_Frames list
# The join is made using the columns 'Entity', 'Year' and 'Code' as join keys
combdados = reduce(lambda left, right:pd.merge(left, right, on=['Entity','Year','Code']), Data_Frames)

# Gets a list of all unique entities/years in the combined DataFrame
paises_comuns = combdados['Entity'].unique()
anos_comuns = combdados['Year'].unique()

# Filters the combined DataFrame to include only rows where 'Entity' is in common_countries and 'Year' is in common_years
df_final = combdados[combdados['Entity'].isin(paises_comuns) & combdados['Year'].isin(anos_comuns)]

# Ends the execution time count
end = time.time()

# Calculates the total execution time
tempo_pandas = (end - start)


print(tempo_pandas)
df_final

0.02780604362487793


Unnamed: 0,Entity,Code,Year,Age-standardized suicide rate - Sex: both sexes,Drug use disorders - Sex: Both - Age: Age-standardized (Rate),"Total alcohol consumption per capita (liters of pure alcohol, projected estimates, 15+ years of age)","DALYs from depressive disorders per 100,000 people in, both sexes aged age-standardized","DALYs from schizophrenia per 100,000 people in, both sexes aged age-standardized","DALYs from bipolar disorder per 100,000 people in, both sexes aged age-standardized","DALYs from eating disorders per 100,000 people in, both sexes aged age-standardized","DALYs from anxiety disorders per 100,000 people in, both sexes aged age-standardized"
0,Afghanistan,AFG,2010,6.6700,1.55,0.21,888.55730,133.93993,147.637950,21.996126,442.29670
1,Afghanistan,AFG,2015,5.9900,1.55,0.21,889.83100,135.82290,148.000870,24.176653,453.54910
2,Afghanistan,AFG,2018,5.9100,1.60,0.21,888.19760,135.77147,148.186950,24.742624,455.19724
3,Albania,ALB,2000,5.2300,0.72,6.57,383.35883,181.26312,117.579150,22.061806,355.20570
4,Albania,ALB,2005,7.6800,0.78,7.65,388.85956,182.34738,117.714270,24.131720,356.89362
...,...,...,...,...,...,...,...,...,...,...,...
899,Zimbabwe,ZWE,2000,19.9764,1.07,2.46,522.54120,129.83534,114.570190,23.274557,302.44020
900,Zimbabwe,ZWE,2005,21.9666,1.06,2.77,534.28345,128.15717,114.573400,21.463827,302.60360
901,Zimbabwe,ZWE,2010,35.0142,1.16,3.93,530.62470,126.10969,114.908585,19.506409,301.86316
902,Zimbabwe,ZWE,2015,30.7352,1.26,4.92,546.46204,127.10872,115.320730,20.423056,302.20868


In [9]:
# Count the number of occurrences of each 'Entity' in the final DataFrame and store in 'z'. 'z' will be a boolean series where the value is True if the count is less than 5
z = df_final.Entity.value_counts() < 5

# Checks if there are any entities with less than 5 occurrences, if so, filters the final DataFrame to exclude these entities
if z.any():
    df_final = df_final[~df_final.Entity.isin(z[z].index)]
else:
    pass

# Resets the index of the final DataFrame
df_final = df_final.reset_index()

df_final

Unnamed: 0,index,Entity,Code,Year,Age-standardized suicide rate - Sex: both sexes,Drug use disorders - Sex: Both - Age: Age-standardized (Rate),"Total alcohol consumption per capita (liters of pure alcohol, projected estimates, 15+ years of age)","DALYs from depressive disorders per 100,000 people in, both sexes aged age-standardized","DALYs from schizophrenia per 100,000 people in, both sexes aged age-standardized","DALYs from bipolar disorder per 100,000 people in, both sexes aged age-standardized","DALYs from eating disorders per 100,000 people in, both sexes aged age-standardized","DALYs from anxiety disorders per 100,000 people in, both sexes aged age-standardized"
0,3,Albania,ALB,2000,5.2300,0.72,6.57,383.35883,181.26312,117.579150,22.061806,355.20570
1,4,Albania,ALB,2005,7.6800,0.78,7.65,388.85956,182.34738,117.714270,24.131720,356.89362
2,5,Albania,ALB,2010,7.6300,0.68,7.69,383.14902,184.12775,117.873230,26.319311,360.68094
3,6,Albania,ALB,2015,4.2300,0.82,6.74,382.93760,185.51762,117.718880,28.092310,366.80685
4,7,Albania,ALB,2018,3.9400,0.87,7.17,384.17953,185.36105,117.761635,28.972872,366.46720
...,...,...,...,...,...,...,...,...,...,...,...,...
885,899,Zimbabwe,ZWE,2000,19.9764,1.07,2.46,522.54120,129.83534,114.570190,23.274557,302.44020
886,900,Zimbabwe,ZWE,2005,21.9666,1.06,2.77,534.28345,128.15717,114.573400,21.463827,302.60360
887,901,Zimbabwe,ZWE,2010,35.0142,1.16,3.93,530.62470,126.10969,114.908585,19.506409,301.86316
888,902,Zimbabwe,ZWE,2015,30.7352,1.26,4.92,546.46204,127.10872,115.320730,20.423056,302.20868


In [10]:
# Removes the 'index' column
df_final = df_final.drop(columns=['index'])

df_final

Unnamed: 0,Entity,Code,Year,Age-standardized suicide rate - Sex: both sexes,Drug use disorders - Sex: Both - Age: Age-standardized (Rate),"Total alcohol consumption per capita (liters of pure alcohol, projected estimates, 15+ years of age)","DALYs from depressive disorders per 100,000 people in, both sexes aged age-standardized","DALYs from schizophrenia per 100,000 people in, both sexes aged age-standardized","DALYs from bipolar disorder per 100,000 people in, both sexes aged age-standardized","DALYs from eating disorders per 100,000 people in, both sexes aged age-standardized","DALYs from anxiety disorders per 100,000 people in, both sexes aged age-standardized"
0,Albania,ALB,2000,5.2300,0.72,6.57,383.35883,181.26312,117.579150,22.061806,355.20570
1,Albania,ALB,2005,7.6800,0.78,7.65,388.85956,182.34738,117.714270,24.131720,356.89362
2,Albania,ALB,2010,7.6300,0.68,7.69,383.14902,184.12775,117.873230,26.319311,360.68094
3,Albania,ALB,2015,4.2300,0.82,6.74,382.93760,185.51762,117.718880,28.092310,366.80685
4,Albania,ALB,2018,3.9400,0.87,7.17,384.17953,185.36105,117.761635,28.972872,366.46720
...,...,...,...,...,...,...,...,...,...,...,...
885,Zimbabwe,ZWE,2000,19.9764,1.07,2.46,522.54120,129.83534,114.570190,23.274557,302.44020
886,Zimbabwe,ZWE,2005,21.9666,1.06,2.77,534.28345,128.15717,114.573400,21.463827,302.60360
887,Zimbabwe,ZWE,2010,35.0142,1.16,3.93,530.62470,126.10969,114.908585,19.506409,301.86316
888,Zimbabwe,ZWE,2015,30.7352,1.26,4.92,546.46204,127.10872,115.320730,20.423056,302.20868


In [11]:
# # Counts the number of occurrences of each value in the 'Entity' column of the 'df_final' DataFrame and compares whether each count is equal to 5.
df_final.Entity.value_counts() == 5

Entity
Albania        True
Rwanda         True
Netherlands    True
New Zealand    True
Nicaragua      True
               ... 
Germany        True
Ghana          True
Greece         True
Grenada        True
Zimbabwe       True
Name: count, Length: 178, dtype: bool

In [12]:
# Convert the DataFrame 'df_final' to CSV format and save the file with the name 'Data_Frame_Pandas.csv'
df_final_csv = df_final.to_csv('Data_Frame_Pandas.csv')

# PySpark

Then we did the same procedure for Pyspak.

In [13]:
# Create a new SparkSession with the application name 'Project' or get an existing session
spark = SparkSession.builder.appName('Projeto').getOrCreate()

In [14]:
# Checks Spark version and current session
spark

In [16]:
# Import the dataframes
# The command 'option(header, true)' is important to define the title of the columns, which by default would be c_0, c_1, etc...
# inferSchema=True is important for transforming data that is assumed to be str into int

df_file1 = spark.read.option('header','true').csv("C:\\Users\\carla\\OneDrive\\Ambiente de Trabalho\\Universidade\Mestrado\\1º Ano\\2º Semestre\\Big Data\\Projeto\\Data Sets\\death-rate-from-suicides-gho.csv",inferSchema = True)
df_file2 = spark.read.option('header','true').csv("C:\\Users\\carla\\OneDrive\\Ambiente de Trabalho\\Universidade\Mestrado\\1º Ano\\2º Semestre\\Big Data\\Projeto\\Data Sets\\rates-from-drug-use-disorders.csv",inferSchema = True)
df_file3 = spark.read.option('header','true').csv("C:\\Users\\carla\\OneDrive\\Ambiente de Trabalho\\Universidade\Mestrado\\1º Ano\\2º Semestre\\Big Data\\Projeto\\Data Sets\\total-alcohol-consumption-per-capita-litres-of-pure-alcohol.csv",inferSchema = True)
df_file4 = spark.read.option('header','true').csv("C:\\Users\\carla\\OneDrive\\Ambiente de Trabalho\\Universidade\Mestrado\\1º Ano\\2º Semestre\\Big Data\\Projeto\\Data Sets\\burden-disease-from-each-mental-illness.csv",inferSchema = True)

In [17]:
# Check the type of date in the csv file
type(df_file1)

pyspark.sql.dataframe.DataFrame

In [18]:
# Prints the schema (structure of columns and their types) of the DataFrame df_file1
# If we don't specify, the data type will be assumed to be a string
df_file1.printSchema()

root
 |-- Entity: string (nullable = true)
 |-- Code: string (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Age-standardized suicide rate - Sex: both sexes: double (nullable = true)



In [19]:
# Shows the type of data present in DataFrame df_file1
df_file1

# Show the first 5 lines of the DataFrame 
df_file1.show(5)

+-----------+----+----+-----------------------------------------------+
|     Entity|Code|Year|Age-standardized suicide rate - Sex: both sexes|
+-----------+----+----+-----------------------------------------------+
|Afghanistan| AFG|2000|                                           7.71|
|Afghanistan| AFG|2001|                                           7.89|
|Afghanistan| AFG|2002|                                           7.85|
|Afghanistan| AFG|2003|                                           7.72|
|Afghanistan| AFG|2004|                                           7.77|
+-----------+----+----+-----------------------------------------------+
only showing top 5 rows



In [89]:
# Starts counting the execution time
start = time.time()

# Performs internal joins of DataFrames df_file1, df_file2, df_file3 and df_file4 using the columns 'Entity', 'Code' and 'Year' as join keys
df_joined = df_file1.join(df_file2, ['Entity', 'Code', 'Year'],"inner").join(df_file3, ['Entity', 'Code', 'Year'],"inner").join(df_file4, ['Entity', 'Code', 'Year'],"inner")

# Ends the execution time count
end = time.time()

# Calculates the total execution time
tempo_spark = (end - start)

# Prints the execution time
print(tempo_spark)

0.041808128356933594


In [92]:
# Displays the DataFrame
df_joined.show()

+-------------------+----+----+-----------------------------------------------+-------------------------------------------------------------+----------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------+--------------------------------------------------------------------------------+-----------------------------------------------------------------------------------+-----------------------------------------------------------------------------------+------------------------------------------------------------------------------------+
|             Entity|Code|Year|Age-standardized suicide rate - Sex: both sexes|Drug use disorders - Sex: Both - Age: Age-standardized (Rate)|Total alcohol consumption per capita (liters of pure alcohol, projected estimates, 15+ years of age)|DALYs from depressive disorders per 100,000 people in, both sexes aged age-standardized|DALYs

Then we decided to compare the time required by each of the libraries 

In [94]:
print(f'Using PySpark it takes {tempo_spark} seconds!') 
print(f'Using Pandas it takes {tempo_pandas} seconds!') 

Using PySpark it takes 0.041808128356933594 seconds!
Using Pandas it takes 0.02780604362487793 seconds!


# Upload the new dataset into a database

Once the data has been processed, we place it in our MongoDB database.

In [19]:
# Connection String provided by MongoDB - Replace <password> with the real password
uri = "mongodb+srv://carlosfcgomes0:<password>@bigdatadb.qzzcrlk.mongodb.net/?retryWrites=true&w=majority&appName=BigDataDb"

# Create a new client and connect to the MongoDB server
client = MongoClient(uri, server_api=ServerApi('1'))

# Confirm that the connection was successful
try:
    client.admin.command('ping')
    print("Pinged your deployment. You successfully connected to MongoDB!")
except Exception as e:
    print(e)

Pinged your deployment. You successfully connected to MongoDB!


In [20]:
# Accessing the database
db = client.get_database("BigDatadb")

In [21]:
# Access the collection
collection = db.get_collection("BigData")

In [25]:
# Read the csv created by Pandas and assign it to a variable
data = pd.read_csv('Data_Frame_Pandas.csv') 


In [26]:
# Convert the dataframe into a dictionary
data_dict = data.to_dict(orient='records')

In [27]:
# Insert the data into the MongoDB collection
collection.insert_many(data_dict)

InsertManyResult([ObjectId('662a5d22fb4e106c12a3512e'), ObjectId('662a5d22fb4e106c12a3512f'), ObjectId('662a5d22fb4e106c12a35130'), ObjectId('662a5d22fb4e106c12a35131'), ObjectId('662a5d22fb4e106c12a35132'), ObjectId('662a5d22fb4e106c12a35133'), ObjectId('662a5d22fb4e106c12a35134'), ObjectId('662a5d22fb4e106c12a35135'), ObjectId('662a5d22fb4e106c12a35136'), ObjectId('662a5d22fb4e106c12a35137'), ObjectId('662a5d22fb4e106c12a35138'), ObjectId('662a5d22fb4e106c12a35139'), ObjectId('662a5d22fb4e106c12a3513a'), ObjectId('662a5d22fb4e106c12a3513b'), ObjectId('662a5d22fb4e106c12a3513c'), ObjectId('662a5d22fb4e106c12a3513d'), ObjectId('662a5d22fb4e106c12a3513e'), ObjectId('662a5d22fb4e106c12a3513f'), ObjectId('662a5d22fb4e106c12a35140'), ObjectId('662a5d22fb4e106c12a35141'), ObjectId('662a5d22fb4e106c12a35142'), ObjectId('662a5d22fb4e106c12a35143'), ObjectId('662a5d22fb4e106c12a35144'), ObjectId('662a5d22fb4e106c12a35145'), ObjectId('662a5d22fb4e106c12a35146'), ObjectId('662a5d22fb4e106c12a351

In [28]:
# Close the connection
client.close()

Work done by:
- Carlos Gomes (PG51681);
- Ian Alves (PG51682).

The information related to the work can be found on: https://github.com/CarlosGomes00/BigData