In [None]:
# !pip install matplotlib
import os
import pandas as pd
import pymongo

# Spark imports
from pyspark.sql import SparkSession
from pyspark import SparkConf
from pyspark.sql import functions as f
from pyspark.sql.functions import *
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, ArrayType


# GeoSpark imports
from geopy.geocoders import Nominatim
from shapely.geometry import Point
from geospark.register import GeoSparkRegistrator
from geospark.sql.types import GeometryType
from geospark.utils import GeoSparkKryoRegistrator
import geopandas as gpd


# set local variables
%env MINIO_PASSWORD 6HgSzdwj8eNpHcux
%env MINIO_USERNAME grupo-02

# parametros para conectar a mongo
client = pymongo.MongoClient("mongodb+srv://julianestevanof:ql6e7a6B4xGetupj@taller1.joran3x.mongodb.net/?retryWrites=true&w=majority")
db_mongo = client.Taller1


In [None]:

os.environ[
    "PYSPARK_SUBMIT_ARGS"
] = "--packages org.apache.hadoop:hadoop-aws:3.2.2,io.delta:delta-core_2.12:1.1.0  pyspark-shell "
config = {
    "spark.jars.packages":"org.apache.hadoop:hadoop-aws:3.2.2",
    "spark.kubernetes.namespace": "spark",
    "spark.kubernetes.container.image": "cronosnull/abd-spark-base:202301",
    "spark.executor.instances": "15",
    "spark.executor.memory": "10g",
    "spark.executor.cores": "1",
    "spark.driver.memory":"5g",
    "spark.driver.port":"38891",
    "spark.driver.blockManager.port":"7779",
    "spark.driver.bindAddress": "0.0.0.0",
    "spark.driver.host":"172.24.99.147",
    "spark.kubernetes.executor.request.cores":"500m",
    "spark.hadoop.fs.s3a.endpoint": "http://172.24.99.18:9000",
    
    # Credenciales de MinNIO, no olvide asignar las variables de entorno
    "spark.hadoop.fs.s3a.access.key": os.environ.get('MINIO_USERNAME', "--"),
    "spark.hadoop.fs.s3a.secret.key": os.environ.get('MINIO_PASSWORD',"--"),
    "spark.hadoop.fs.s3a.path.style.access": True,
    "spark.hadoop.fs.s3a.impl": "org.apache.hadoop.fs.s3a.S3AFileSystem",
    "spark.hadoop.fs.s3a.aws.credentials.provider": "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider",
    "spark.kubernetes.local.dirs.tmpfs":True,

}
def get_spark_session(app_name: str, conf: SparkConf):
    conf.setMaster("k8s://https://172.24.99.68:16443")
    for key, value in config.items():
        conf.set(key, value)
    conf.set("spark.ui.port", "4041");
    return SparkSession.builder.appName(app_name).config(conf=conf).getOrCreate()

In [None]:
spark = get_spark_session("grupo-02-taller1", SparkConf())

In [None]:
!spark-submit --version


In [None]:
spark

# Cargar data

In [None]:


# Leer el primer archivo
df_1 = spark.read.csv("s3a://ais/2020/AIS_2020*.csv.gz", header=True)
df_1 = df_1.withColumn("BaseDateTime", to_timestamp("BaseDateTime", "yyyy-MM-dd'T'HH:mm:ss"))
df_1 = df_1.withColumn('LAT', df_1['LAT'].cast(DoubleType()))
df_1 = df_1.withColumn('LON', df_1['LON'].cast(DoubleType()))
df_1 = df_1.withColumn('documento', lit("AIS_2020"))
# df_1 = df_1.sample(0.00001)


# Leer el segundo archivo
df_2 = spark.read.csv("s3a://ais/2019/AIS_2019*.csv.gz", header=True)
df_2 = df_2.withColumn("BaseDateTime", to_timestamp("BaseDateTime", "yyyy-MM-dd'T'HH:mm:ss"))
df_2 = df_2.withColumn('LAT', df_2['LAT'].cast(DoubleType()))
df_2 = df_2.withColumn('LON', df_2['LON'].cast(DoubleType()))
df_2 = df_2.withColumn('documento', lit("AIS_2019"))
# df_2 = df_2.sample(0.00001)


# Leer el tercer archivo
df_3 = spark.read.csv("s3a://ais/2018/AIS_2018*.csv.gz", header=True)
df_3 = df_3.withColumn("BaseDateTime", to_timestamp("BaseDateTime", "yyyy-MM-dd'T'HH:mm:ss"))
df_3 = df_3.withColumn('LAT', df_3['LAT'].cast(DoubleType()))
df_3 = df_3.withColumn('LON', df_3['LON'].cast(DoubleType()))
df_3 = df_3.withColumn('documento', lit("AIS_2018"))
# df_3 = df_3.sample(0.00001)


# Leer el cuarto archivo
df_4 = spark.read.csv("s3a://ais/2017/AIS_2017*.csv.gz", header=True)
df_4 = df_4.withColumn("BaseDateTime", to_timestamp("BaseDateTime", "yyyy-MM-dd'T'HH:mm:ss"))
df_4 = df_4.withColumn("TranscieverClass", lit(''))
df_4 = df_4.withColumn('LAT', df_4['LAT'].cast(DoubleType()))
df_4 = df_4.withColumn('LON', df_4['LON'].cast(DoubleType()))
df_4 = df_4.withColumn('documento', lit("AIS_2017"))
# df_4 = df_4.sample(0.00001)

# Unir los cuatro DataFrames
df_pruebas = df_1.union(df_2).union(df_3).union(df_4)

# Reparticionar el DataFrame en 4 particiones
df_pruebas = df_pruebas.repartition(4, 'documento').cache()


### Add the State to the data

In [None]:
# Crear función UDF para obtener estado a partir de latitud y longitud
def get_state(lat, lon):
    
    geolocator = Nominatim(user_agent="my_app")
    location = geolocator.reverse(f"{lat}, {lon}")
    if location is not None:
        address = location.raw.get('address')
        if address:
            state = address.get('state')
            if state:
                return state
    return None

# Crear función UDF en Spark
get_state_udf = udf(get_state, StringType())

# Crear nueva columna con el estado obtenido
df_with_state = df_pruebas.withColumn("State", get_state_udf("LAT", "LON"))

df_with_state.printSchema()

### Add extra information of VesselTypeCodes

In [None]:
df_vessel_type_codes = spark.read.csv("s3a://user-data/grupo-02/VesselTypeCodes2018.csv", header=True,sep=";")

df_with_vessel = df_with_state.join(df_vessel_type_codes, df_with_state.VesselType == df_vessel_type_codes.VesselType).select(df_with_state['*'], df_vessel_type_codes['*'])

df_with_vessel.printSchema()

### Upload data to mongo


In [None]:
df_enriched = df_with_vessel.withColumn('WEEKDAY', date_format(col("BaseDateTime"), "EEEE"))

df_enriched = df_enriched.withColumn('DATE', date_format(col("BaseDateTime"), "yyyy MM dd"))
df_enriched = df_enriched.select(
    col("State"),
    col("WEEKDAY"),
    col("DATE"),
    col("SOG"),
    col("Classification"),
    col("Group"),
    col("Cargo"),
)


In [None]:
# Punto 1,3 y 4
df_1_3_4 = df_enriched.groupby('State','DATE').agg(count(lit(1)).alias('Cant'))

df_1_3_4_pandas =df_1_3_4.toPandas()
db_mongo.Agrupacion1_3_4.insert_many(df_1_3_4_pandas.to_dict('records'))

In [None]:
# Punto 2 y 5
df_2_5 = df_enriched.groupby('Cargo','WEEKDAY','State','DATE').agg(count(lit(1)).alias('Cant'))

df_2_5_pandas =df_2_5.toPandas()
db_mongo.Agrupacion2_5.insert_many(df_2_5_pandas.to_dict('records'))

In [None]:
# Punto 6
df_6 = df_enriched.groupby('Group').agg(mean("SOG").alias('Avg Velocity'))

df_6_pandas =df_6.toPandas()
db_mongo.Agrupacion6.insert_many(df_6_pandas.to_dict('records'))

In [None]:
# Punto 7
df_7 = df_enriched.groupby('Classification','DATE').agg(count(lit(1)).alias('Cant'))

df_7_pandas =df_7.toPandas()
db_mongo.Agrupacion7.insert_many(df_7_pandas.to_dict('records'))