In [1]:
#Instalar Java 8
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

In [2]:
#Realizar o download do Spark
!wget -q https://dlcdn.apache.org/spark/spark-3.4.3/spark-3.4.3-bin-hadoop3.tgz

In [3]:
!tar xf spark-3.4.3-bin-hadoop3.tgz

In [4]:
!pip install -q findspark

In [5]:
#Importando a biblioteca os
import os
#Definindo a variável de ambiente do Java
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

#Definindo a variável de ambiente do Spark
os.environ["SPARK_HOME"] = "/content/spark-3.4.3-bin-hadoop3"

import findspark

#Iniciando o findspark
findspark.init('spark-3.4.3-bin-hadoop3')

In [6]:
# iniciar uma sessão local
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, FloatType, DoubleType
import pyspark.sql.functions as F
import time

os.environ['PYARROW_IGNORE_TIMEZONE'] = '1'
sc = SparkSession.builder.master('local[*]').config('spark.ui.port', '4050').enableHiveSupport().getOrCreate()
spark = SparkSession(sc)


In [30]:
from google.colab import drive
drive.mount('/content/drive', force_remount=True)

Mounted at /content/drive


In [33]:
import pyspark.pandas as pd
import time

os.environ["Drive_csvs"] = '/content/drive/MyDrive/HiveProject/csvs'
drive_csvs_path = os.environ.get("Drive_csvs")


print(os.listdir('/content/drive/MyDrive/HiveProject/csvs/'))

crime_file_path = os.path.join(drive_csvs_path,'crime.csv')
segment_file_path = os.path.join(drive_csvs_path, 'segment.csv')
district_file_path = os.path.join(drive_csvs_path, 'district.csv')
vertice_file_path = os.path.join(drive_csvs_path, 'vertice.csv')
time_file_path = os.path.join(drive_csvs_path, 'time.csv')
neighborhood_file_path = os.path.join(drive_csvs_path, 'neighborhood.csv')

['vertice.csv', 'time.csv', 'district.csv', 'neighborhood.csv', 'segment.csv', 'crime.csv']


In [34]:
df_crime = spark.read.csv(crime_file_path, header=True, sep=';',inferSchema=True)
df_crime.createOrReplaceTempView("df_crime")
df_crime = spark.sql("Select * from df_crime")

df_crime.write.saveAsTable("crime", format="parquet", mode="overwrite")

print("Data successfully created.")


Data successfully created.


In [35]:
df_segment = spark.read.csv(segment_file_path, header=True, sep=';',inferSchema=True)
df_segment.createOrReplaceTempView("df_segment")
df_segment = spark.sql("Select * from df_segment")

df_segment.write.saveAsTable("segment", format="parquet", mode="overwrite")

print("Data successfully created.")

Data successfully created.


In [36]:
df_district = spark.read.csv(district_file_path, header=True, sep=';',inferSchema=True)
df_district.createOrReplaceTempView("df_district")
df_district = spark.sql("Select * from df_district")

df_district.write.saveAsTable("district", format="parquet", mode="overwrite")

print("Data successfully created.")

Data successfully created.


In [37]:
df_vertice = spark.read.csv(vertice_file_path, header=True, sep=';',inferSchema=True)
df_vertice.createOrReplaceTempView("df_vertice")
df_vertice = spark.sql("Select * from df_vertice")

df_vertice.write.saveAsTable("vertice", format="parquet", mode="overwrite")

print("Data successfully created.")

Data successfully created.


In [38]:
df_time = spark.read.csv(time_file_path, header=True, sep=';',inferSchema=True)
df_time.createOrReplaceTempView("df_time")
df_time = spark.sql("Select * from df_time")

df_time.write.saveAsTable("time", format="parquet", mode="overwrite")

print("Data successfully created.")

Data successfully created.


In [39]:
df_neighborhood = spark.read.csv(neighborhood_file_path, header=True, sep=';',inferSchema=True)
df_neighborhood.createOrReplaceTempView("df_neighborhood")
df_neighborhood = spark.sql("Select * from df_neighborhood")

df_neighborhood.write.saveAsTable("neighborhood", format="parquet", mode="overwrite")

print("Data successfully created.")

Data successfully created.


In [48]:
start_time1 = time.time()
query_1 = sc.sql(f"""
SELECT
    s.id AS segment_id,
    SUM(c.total_feminicide) AS total_feminicide,
    SUM(c.total_homicide) AS total_homicide,
    SUM(c.total_felony_murder) AS total_felony_murder,
    SUM(c.total_bodily_harm) AS total_bodily_harm,
    SUM(c.total_theft_cellphone) AS total_theft_cellphone,
    SUM(c.total_armed_robbery_cellphone) AS total_armed_robbery_cellphone,
    SUM(c.total_theft_auto) AS total_theft_auto,
    SUM(c.total_armed_robbery_auto) AS total_armed_robbery_auto,
    s.geometry AS segment_geometry
FROM
    crime c
    JOIN segment s ON c.segment_id = s.id
    JOIN vertice v_start ON s.start_vertice_id = v_start.id
    JOIN district d ON v_start.district_id = d.id
    JOIN time t ON c.time_id = t.id
WHERE
    d.name = 'IGUATEMI'
    AND t.year = 2016
GROUP BY
    s.id, s.geometry
ORDER BY
    s.id;
""").show()
elapsed_time1 = time.time() - start_time1

print(f"Elapsed Time: {elapsed_time1} seconds")


+----------+----------------+--------------+-------------------+-----------------+---------------------+-----------------------------+----------------+------------------------+--------------------+
|segment_id|total_feminicide|total_homicide|total_felony_murder|total_bodily_harm|total_theft_cellphone|total_armed_robbery_cellphone|total_theft_auto|total_armed_robbery_auto|    segment_geometry|
+----------+----------------+--------------+-------------------+-----------------+---------------------+-----------------------------+----------------+------------------------+--------------------+
|      2191|               0|             0|                  0|                0|                    0|                           10|               0|                       5|0105000020E610000...|
|      2192|               0|             0|                  0|                0|                   10|                           15|               0|                       5|0105000020E610000...|
|      219

In [53]:
start_time2 = time.time()
query_2 = sc.sql(f"""
SELECT
    SUM(c.total_feminicide) AS total_feminicide,
    SUM(c.total_homicide) AS total_homicide,
    SUM(c.total_felony_murder) AS total_felony_murder,
    SUM(c.total_bodily_harm) AS total_bodily_harm,
    SUM(c.total_theft_cellphone) AS total_theft_cellphone,
    SUM(c.total_armed_robbery_cellphone) AS total_armed_robbery_cellphone,
    SUM(c.total_theft_auto) AS total_theft_auto,
    SUM(c.total_armed_robbery_auto) AS total_armed_robbery_auto,
    c.segment_id,
    s.geometry
FROM
    crime c
    JOIN segment s ON c.segment_id = s.id
    JOIN vertice v_start ON s.start_vertice_id = v_start.id
    JOIN district d ON v_start.district_id = d.id
    JOIN time t on c.time_id = t.id
WHERE
    d.name = 'IGUATEMI'
    AND t.year BETWEEN 2006 AND 2016
GROUP BY
    c.segment_id, s.geometry;
""").show()
elapsed_time2 = time.time() - start_time2

print(f"Elapsed Time: {elapsed_time2} seconds")

+----------------+--------------+-------------------+-----------------+---------------------+-----------------------------+----------------+------------------------+----------+--------------------+
|total_feminicide|total_homicide|total_felony_murder|total_bodily_harm|total_theft_cellphone|total_armed_robbery_cellphone|total_theft_auto|total_armed_robbery_auto|segment_id|            geometry|
+----------------+--------------+-------------------+-----------------+---------------------+-----------------------------+----------------+------------------------+----------+--------------------+
|               0|             0|                  0|                0|                    0|                           25|               5|                      20|      2191|0105000020E610000...|
|               0|             5|                  0|                0|                   35|                          135|              10|                     175|      2192|0105000020E610000...|
|         

In [60]:
start_time3 = time.time()
query_3 = sc.sql(f"""
SELECT
    SUM(c.total_theft_cellphone) AS total_theft_cellphone,
    SUM(c.total_theft_auto) AS total_theft_auto
FROM
    crime c
    JOIN segment s ON c.segment_id = s.id
    JOIN vertice v_start ON s.start_vertice_id = v_start.id
    JOIN neighborhood n ON v_start.neighborhood_id = n.id
    JOIN time t ON c.time_id = t.id
WHERE
    n.name = 'SANTA EFIGÊNIA'
    AND t.year = 2015;
""").show()
elapsed_time3 = time.time() - start_time3

print(f"Elapsed Time: {elapsed_time3} seconds")

+---------------------+----------------+
|total_theft_cellphone|total_theft_auto|
+---------------------+----------------+
|                 null|            null|
+---------------------+----------------+

Elapsed Time: 1.8367407321929932 seconds


In [63]:
start_time4 = time.time()
query_4 = sc.sql(f"""
SELECT c.total_feminicide, c.total_homicide, c.total_felony_murder, c.total_bodily_harm,
       c.total_theft_cellphone, c.total_armed_robbery_cellphone, c.total_theft_auto, c.total_armed_robbery_auto
FROM crime c
JOIN segment s ON c.segment_id = s.id
JOIN time t on c.time_id = t.id
WHERE s.oneway = 'yes'
  AND t.year = 2012;
""").show()
elapsed_time4 = time.time() - start_time4

print(f"Elapsed Time: {elapsed_time4} seconds")

+----------------+--------------+-------------------+-----------------+---------------------+-----------------------------+----------------+------------------------+
|total_feminicide|total_homicide|total_felony_murder|total_bodily_harm|total_theft_cellphone|total_armed_robbery_cellphone|total_theft_auto|total_armed_robbery_auto|
+----------------+--------------+-------------------+-----------------+---------------------+-----------------------------+----------------+------------------------+
|               0|             0|                  0|                0|                    0|                            1|               0|                       0|
|               0|             0|                  0|                0|                    0|                            1|               0|                       0|
|               0|             0|                  0|                0|                    0|                            1|               0|                       0|
|   

In [65]:
start_time5 = time.time()
query_5 = sc.sql(f"""
SELECT SUM(c.total_theft_cellphone) AS total_theft_cellphone, SUM(c.total_theft_auto) AS total_theft_auto
FROM crime c
JOIN segment s ON c.segment_id = s.id
JOIN time t on c.time_id = t.id
WHERE t.year = 2017;
""").show()
elapsed_time5 = time.time() - start_time5

print(f"Elapsed Time: {elapsed_time5} seconds")

+---------------------+----------------+
|total_theft_cellphone|total_theft_auto|
+---------------------+----------------+
|               421435|          214560|
+---------------------+----------------+

Elapsed Time: 1.6879816055297852 seconds


In [66]:
start_time6 = time.time()
query_6 = sc.sql(f"""
SELECT c.segment_id, SUM(c.total_feminicide + c.total_homicide + c.total_felony_murder + c.total_bodily_harm +
                          c.total_theft_cellphone + c.total_armed_robbery_cellphone +
                          c.total_theft_auto + c.total_armed_robbery_auto) AS total_crimes
FROM crime c
JOIN time t on c.time_id = t.id
WHERE t.month = 11 AND t.year = 2010
GROUP BY c.segment_id
ORDER BY total_crimes DESC;
""").show()
elapsed_time6 = time.time() - start_time6

print(f"Elapsed Time: {elapsed_time6} seconds")

+----------+------------+
|segment_id|total_crimes|
+----------+------------+
|     34854|          24|
+----------+------------+

Elapsed Time: 2.0041565895080566 seconds


In [70]:
start_time7 = time.time()
query_7 = sc.sql(f"""
SELECT c.segment_id, SUM(c.total_feminicide + c.total_homicide + c.total_felony_murder + c.total_bodily_harm +
                          c.total_theft_cellphone + c.total_armed_robbery_cellphone +
                          c.total_theft_auto + c.total_armed_robbery_auto) AS total_crimes
FROM crime c
JOIN time t ON c.time_id = t.id
WHERE t.year = 2018 AND t.weekday IN ('saturday', 'sunday')
GROUP BY c.segment_id
ORDER BY total_crimes DESC;
""").show()
elapsed_time7 = time.time() - start_time7

print(f"Elapsed Time: {elapsed_time7} seconds")

+----------+------------+
|segment_id|total_crimes|
+----------+------------+
|    160538|        1564|
|    149515|         678|
|    160606|         616|
|    160607|         597|
|    111697|         590|
|    127455|         573|
|    132899|         446|
|    172841|         430|
|    201751|         422|
|    155778|         417|
|    226199|         415|
|    131081|         394|
|    111689|         378|
|     59483|         372|
|    219103|         365|
|    205851|         350|
|    202703|         346|
|    113146|         306|
|     22731|         300|
|    114926|         286|
+----------+------------+
only showing top 20 rows

Elapsed Time: 1.1925725936889648 seconds


In [71]:
print(f"""

Tempo de cada consulta:

1){elapsed_time1}
2){elapsed_time2}
3){elapsed_time3}
4){elapsed_time4}
5){elapsed_time5}
6){elapsed_time6}
7){elapsed_time7}

tempo total: {elapsed_time1 + elapsed_time2 + elapsed_time3 + elapsed_time4 + elapsed_time5 + elapsed_time6 + elapsed_time7} segundos
""")



Tempo de cada consulta:

1)21.77898097038269
2)13.126424074172974
3)1.8367407321929932
4)1.3910479545593262
5)1.6879816055297852
6)2.0041565895080566
7)1.1925725936889648

tempo total: 43.01790452003479



In [79]:
import concurrent.futures
import threading
import time

# Define your queries
query_1 = '''SELECT
    s.id AS segment_id,
    SUM(c.total_feminicide) AS total_feminicide,
    SUM(c.total_homicide) AS total_homicide,
    SUM(c.total_felony_murder) AS total_felony_murder,
    SUM(c.total_bodily_harm) AS total_bodily_harm,
    SUM(c.total_theft_cellphone) AS total_theft_cellphone,
    SUM(c.total_armed_robbery_cellphone) AS total_armed_robbery_cellphone,
    SUM(c.total_theft_auto) AS total_theft_auto,
    SUM(c.total_armed_robbery_auto) AS total_armed_robbery_auto,
    s.geometry AS segment_geometry
FROM
    crime c
    JOIN segment s ON c.segment_id = s.id
    JOIN vertice v_start ON s.start_vertice_id = v_start.id
    JOIN district d ON v_start.district_id = d.id
    JOIN time t ON c.time_id = t.id
WHERE
    d.name = 'IGUATEMI'
    AND t.year = 2016
GROUP BY
    s.id, s.geometry
ORDER BY
    s.id;'''
query_2 = """
SELECT
    SUM(c.total_feminicide) AS total_feminicide,
    SUM(c.total_homicide) AS total_homicide,
    SUM(c.total_felony_murder) AS total_felony_murder,
    SUM(c.total_bodily_harm) AS total_bodily_harm,
    SUM(c.total_theft_cellphone) AS total_theft_cellphone,
    SUM(c.total_armed_robbery_cellphone) AS total_armed_robbery_cellphone,
    SUM(c.total_theft_auto) AS total_theft_auto,
    SUM(c.total_armed_robbery_auto) AS total_armed_robbery_auto,
    c.segment_id,
    s.geometry
FROM
    crime c
    JOIN segment s ON c.segment_id = s.id
    JOIN vertice v_start ON s.start_vertice_id = v_start.id
    JOIN district d ON v_start.district_id = d.id
    JOIN time t on c.time_id = t.id
WHERE
    d.name = 'IGUATEMI'
    AND t.year BETWEEN 2006 AND 2016
GROUP BY
    c.segment_id, s.geometry;
"""
query_3 = """
SELECT
    SUM(c.total_theft_cellphone) AS total_theft_cellphone,
    SUM(c.total_theft_auto) AS total_theft_auto
FROM
    crime c
    JOIN segment s ON c.segment_id = s.id
    JOIN vertice v_start ON s.start_vertice_id = v_start.id
    JOIN neighborhood n ON v_start.neighborhood_id = n.id
    JOIN time t ON c.time_id = t.id
WHERE
    n.name = 'SANTA EFIGÊNIA'
    AND t.year = 2015;
"""
query_4 = """
SELECT c.total_feminicide, c.total_homicide, c.total_felony_murder, c.total_bodily_harm,
       c.total_theft_cellphone, c.total_armed_robbery_cellphone, c.total_theft_auto, c.total_armed_robbery_auto
FROM crime c
JOIN segment s ON c.segment_id = s.id
JOIN time t on c.time_id = t.id
WHERE s.oneway = 'yes'
  AND t.year = 2012;
"""
query_5 = """
SELECT SUM(c.total_theft_cellphone) AS total_theft_cellphone, SUM(c.total_theft_auto) AS total_theft_auto
FROM crime c
JOIN segment s ON c.segment_id = s.id
JOIN time t on c.time_id = t.id
WHERE t.year = 2017;
"""
query_6 = """
SELECT c.segment_id, SUM(c.total_feminicide + c.total_homicide + c.total_felony_murder + c.total_bodily_harm +
                          c.total_theft_cellphone + c.total_armed_robbery_cellphone +
                          c.total_theft_auto + c.total_armed_robbery_auto) AS total_crimes
FROM crime c
JOIN time t on c.time_id = t.id
WHERE t.month = 11 AND t.year = 2010
GROUP BY c.segment_id
ORDER BY total_crimes DESC;
"""
query_7 = """
SELECT c.segment_id, SUM(c.total_feminicide + c.total_homicide + c.total_felony_murder + c.total_bodily_harm +
                          c.total_theft_cellphone + c.total_armed_robbery_cellphone +
                          c.total_theft_auto + c.total_armed_robbery_auto) AS total_crimes
FROM crime c
JOIN time t ON c.time_id = t.id
WHERE t.year = 2018 AND t.weekday IN ('saturday', 'sunday')
GROUP BY c.segment_id
ORDER BY total_crimes DESC;
"""
querys = [
    ('query1', query_1),
    ('query2', query_2),
    ('query3', query_3),
    ('query4', query_4),
    ('query5', query_5),
    ('query6', query_6),
    ('query7', query_7)
]

# Initialize the total timer and lock
total_timer_paralel = 0
total_timer_lock = threading.Lock()

# Function to execute a query and update the total timer
def execute_query(query_id, query_string):
    global total_timer_paralel
    start_time = time.time()
    try:
        result_df = spark.sql(query_string)
        execution_time = time.time() - start_time
        with total_timer_lock:
            total_timer_paralel += execution_time
        print(f"Query {query_id} completed successfully in {execution_time} seconds.")
    except Exception as e:
        print(f"Error executing query {query_id}: {e}")

# Create an executor pool with a limited number of threads (adjust as needed)
executor = concurrent.futures.ThreadPoolExecutor(max_workers=4)

# Submit each query to the executor pool for asynchronous execution
future_to_query = {}
for query_id, query_string in querys:
    future = executor.submit(execute_query, query_id, query_string)
    future_to_query[future] = query_id

# Wait for all queries to complete
for future in concurrent.futures.as_completed(future_to_query):
    query_id = future_to_query[future]
    try:
        future.result()
    except Exception as e:
        print(f"Error executing query {query_id}: {e}")

print(f"Total parallel execution time: {total_timer_paralel} seconds.")


Query query4 completed successfully in 0.7937121391296387 seconds.
Query query2 completed successfully in 0.8852601051330566 seconds.
Query query3 completed successfully in 0.984917163848877 seconds.
Query query1 completed successfully in 1.1049091815948486 seconds.
Query query6 completed successfully in 0.26279139518737793 seconds.
Query query5 completed successfully in 0.38533759117126465 seconds.
Query query7 completed successfully in 0.1967928409576416 seconds.
Total parallel execution time: 4.613720417022705 seconds.
