In [78]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import concat, lit, lpad, substring, to_date, col, when
import requests

import numpy as np
import time

In [48]:
def createRegionState(data):

    data = data.withColumn("REGIAO",
                            when(col("UF").isin([11, 12, 13, 14, 15, 16, 17]), "Norte")
                            .when(col("UF").isin([21, 22, 23, 24, 25, 26, 27, 28, 29]), "Nordeste")
                            .when(col("UF").isin([31, 32, 33, 35]), "Sudeste")
                            .when(col("UF").isin([41, 42, 43]), "Sul")
                            .when(col("UF").isin([50, 51, 52, 53]), "Centro Oeste")
                            .otherwise("NA")  # Default value if none of the conditions match
                        ).withColumn("ESTADO",
                            when(col("UF") == 12, "Acre")
                            .when(col("UF") == 27, "Alagoas")
                            .when(col("UF") == 16, "Amapá")
                            .when(col("UF") == 13, "Amazonas")
                            .when(col("UF") == 29, "Bahia")
                            .when(col("UF") == 23, "Ceará")
                            .when(col("UF") == 53, "Distrito Federal")
                            .when(col("UF") == 32, "Espírito Santo")
                            .when(col("UF") == 52, "Goiás")
                            .when(col("UF") == 21, "Maranhão")
                            .when(col("UF") == 51, "Mato Grosso")
                            .when(col("UF") == 50, "Mato Grosso do Sul")
                            .when(col("UF") == 31, "Minas Gerais")
                            .when(col("UF") == 15, "Pará")
                            .when(col("UF") == 25, "Paraíba")
                            .when(col("UF") == 41, "Paraná")
                            .when(col("UF") == 26, "Pernambuco")
                            .when(col("UF") == 22, "Piauí")
                            .when(col("UF") == 24, "Rio Grande do Norte")
                            .when(col("UF") == 43, "Rio Grande do Sul")
                            .when(col("UF") == 33, "Rio de Janeiro")
                            .when(col("UF") == 11, "Rondônia")
                            .when(col("UF") == 14, "Roraima")
                            .when(col("UF") == 42, "Santa Catarina")
                            .when(col("UF") == 35, "São Paulo")
                            .when(col("UF") == 28, "Sergipe")
                            .when(col("UF") == 17, "Tocantins")
                            .otherwise("NA")  # Default value if no condition matches
                        )

    return data

def CreateState(data):
    """
    uf_to_state = {12: "Acre",
                    27: "Alagoas",
                    16: "Amapá",
                    13: "Amazonas",
                    29: "Bahia",
                    23: "Ceará",
                    53: "Distrito Federal",
                    32: "Espírito Santo",
                    52: "Goiás",
                    21: "Maranhão",
                    51: "Mato Grosso",
                    50: "Mato Grosso do Sul",
                    31: "Minas Gerais",
                    15: "Pará",
                    25: "Paraíba",
                    41: "Paraná",
                    26: "Pernambuco",
                    22: "Piauí",
                    24: "Rio Grande do Norte",
                    43: "Rio Grande do Sul",
                    33: "Rio de Janeiro",
                    11: "Rondônia",
                    14: "Roraima",
                    42: "Santa Catarina",
                    35: "São Paulo",
                    28: "Sergipe",
                    17: "Tocantins"
                }
    # Map the UF codes to states and create the new column
    data['ESTADO'] = data['UF'].map(uf_to_state)

    """
    data = data.withColumn("ESTADO",
                            when(col("UF") == 12, "Acre")
                            .when(col("UF") == 27, "Alagoas")
                            .when(col("UF") == 16, "Amapá")
                            .when(col("UF") == 13, "Amazonas")
                            .when(col("UF") == 29, "Bahia")
                            .when(col("UF") == 23, "Ceará")
                            .when(col("UF") == 53, "Distrito Federal")
                            .when(col("UF") == 32, "Espírito Santo")
                            .when(col("UF") == 52, "Goiás")
                            .when(col("UF") == 21, "Maranhão")
                            .when(col("UF") == 51, "Mato Grosso")
                            .when(col("UF") == 50, "Mato Grosso do Sul")
                            .when(col("UF") == 31, "Minas Gerais")
                            .when(col("UF") == 15, "Pará")
                            .when(col("UF") == 25, "Paraíba")
                            .when(col("UF") == 41, "Paraná")
                            .when(col("UF") == 26, "Pernambuco")
                            .when(col("UF") == 22, "Piauí")
                            .when(col("UF") == 24, "Rio Grande do Norte")
                            .when(col("UF") == 43, "Rio Grande do Sul")
                            .when(col("UF") == 33, "Rio de Janeiro")
                            .when(col("UF") == 11, "Rondônia")
                            .when(col("UF") == 14, "Roraima")
                            .when(col("UF") == 42, "Santa Catarina")
                            .when(col("UF") == 35, "São Paulo")
                            .when(col("UF") == 28, "Sergipe")
                            .when(col("UF") == 17, "Tocantins")
                            .otherwise("NA")  # Default value if no condition matches
                        )


    return data

def read_csv_from_api(spark, api_url):
    """
    Optimized method to read CSV data from an API link using PySpark

    Args:
        spark (SparkSession): Initialized Spark session
        api_url (str): URL of the CSV file to be read

    Returns:
        pyspark.sql.DataFrame: DataFrame containing the CSV data
    """
    try:
        # Direct streaming of API content
        response = requests.get(api_url)
        response.raise_for_status()  # Raise an exception for bad status codes

        # Create a temporary local file
        with open('temp_api_data.csv', 'wb') as f:
            f.write(response.content)

        # Read the CSV file into a Spark DataFrame
        df = spark.read.csv(
            'temp_api_data.csv',
            header=True,  # Assumes the first row is a header
            inferSchema=True,  # Automatically detect column types
            sep=';'
        )

        # Optimized date transformation using built-in Spark functions
        df = df.withColumn("DTNASC_padded",
                           lpad(col("DTNASC"), 8, '0')).select(
            "IDADEMAE", 
            "RACACORMAE",
            "CODMUNNASC",
            "PESO",
            # Optimized date transformation
            to_date(
                concat(
                    lpad(substring(col("DTNASC_padded"), 1, 2), 2, '0'),
                    lpad(substring(col("DTNASC_padded"), 3, 2), 2, '0'),
                    substring(col("DTNASC_padded"), 5, 4)
                ),
                'ddMMyyyy'
            ).alias("DTNASC_padded")
        ).withColumn("UF", substring(col("CODMUNNASC"), 1, 2))

        df = createRegion(df)
        df = CreateState(df)

        return df.drop("CODMUNNASC","UF")

    except requests.RequestException as e:
        print(f"Error fetching data from API: {e}")
        return None

In [5]:
spark = SparkSession.builder \
        .appName("CSV API Reader - LBW") \
        .getOrCreate()

In [79]:
"""
https://s3.sa-east-1.amazonaws.com/ckan.saude.gov.br/SINASC/DNOPEN23.csv
https://s3.sa-east-1.amazonaws.com/ckan.saude.gov.br/SINASC/DNOPEN22.csv
https://s3.sa-east-1.amazonaws.com/ckan.saude.gov.br/SINASC/SINASC_2021.csv
https://s3.sa-east-1.amazonaws.com/ckan.saude.gov.br/SINASC/SINASC_2020.csv
https://s3.sa-east-1.amazonaws.com/ckan.saude.gov.br/SINASC/SINASC_2019.csv
https://s3.sa-east-1.amazonaws.com/ckan.saude.gov.br/SINASC/SINASC_2019.csv
https://s3.sa-east-1.amazonaws.com/ckan.saude.gov.br/SINASC/SINASC_2017.csv
https://s3.sa-east-1.amazonaws.com/ckan.saude.gov.br/SINASC/SINASC_2016.csv
"""

url = 'https://diaad.s3.sa-east-1.amazonaws.com/sinasc/SINASC_2013.csv'

dt = read_csv_from_api(spark = spark, api_url = url)

for ano in range(11, 21, 1):
  url = 'https://diaad.s3.sa-east-1.amazonaws.com/sinasc/SINASC_20' + str(ano) + '.csv'
  print(url)
  dt_aux = read_csv_from_api(spark = spark, api_url = url)
  time.sleep(10)

  dt = dt.union(dt_aux)

https://diaad.s3.sa-east-1.amazonaws.com/sinasc/SINASC_2011.csv
https://diaad.s3.sa-east-1.amazonaws.com/sinasc/SINASC_2012.csv
https://diaad.s3.sa-east-1.amazonaws.com/sinasc/SINASC_2013.csv
https://diaad.s3.sa-east-1.amazonaws.com/sinasc/SINASC_2014.csv
https://diaad.s3.sa-east-1.amazonaws.com/sinasc/SINASC_2015.csv
https://diaad.s3.sa-east-1.amazonaws.com/sinasc/SINASC_2016.csv
https://diaad.s3.sa-east-1.amazonaws.com/sinasc/SINASC_2017.csv
https://diaad.s3.sa-east-1.amazonaws.com/sinasc/SINASC_2018.csv
https://diaad.s3.sa-east-1.amazonaws.com/sinasc/SINASC_2019.csv
https://diaad.s3.sa-east-1.amazonaws.com/sinasc/SINASC_2020.csv


In [81]:

dt_10 = read_csv_from_api(spark = spark, api_url = 'https://diaad.s3.sa-east-1.amazonaws.com/sinasc/SINASC_2010.csv')
time.sleep(10)
dt_11 = read_csv_from_api(spark = spark, api_url = 'https://diaad.s3.sa-east-1.amazonaws.com/sinasc/SINASC_2011.csv')
time.sleep(10)
dt_12 = read_csv_from_api(spark = spark, api_url = 'https://diaad.s3.sa-east-1.amazonaws.com/sinasc/SINASC_2012.csv')
time.sleep(10)
dt_13 = read_csv_from_api(spark = spark, api_url = 'https://diaad.s3.sa-east-1.amazonaws.com/sinasc/SINASC_2013.csv')
time.sleep(10)
dt_14 = read_csv_from_api(spark = spark, api_url = 'https://diaad.s3.sa-east-1.amazonaws.com/sinasc/SINASC_2014.csv')
time.sleep(10)
dt_15 = read_csv_from_api(spark = spark, api_url = 'https://diaad.s3.sa-east-1.amazonaws.com/sinasc/SINASC_2015.csv')
time.sleep(10)
dt_16 = read_csv_from_api(spark = spark, api_url = 'https://diaad.s3.sa-east-1.amazonaws.com/sinasc/SINASC_2016.csv')
time.sleep(10)
dt_17 = read_csv_from_api(spark = spark, api_url = 'https://diaad.s3.sa-east-1.amazonaws.com/sinasc/SINASC_2017.csv')
time.sleep(10)
dt_18 = read_csv_from_api(spark = spark, api_url = 'https://diaad.s3.sa-east-1.amazonaws.com/sinasc/SINASC_2018.csv')
time.sleep(10)
dt_19 = read_csv_from_api(spark = spark, api_url = 'https://diaad.s3.sa-east-1.amazonaws.com/sinasc/SINASC_2019.csv')
time.sleep(10)
dt_20 = read_csv_from_api(spark = spark, api_url = 'https://diaad.s3.sa-east-1.amazonaws.com/sinasc/SINASC_2020.csv')
time.sleep(10)
dt_21 = read_csv_from_api(spark = spark, api_url = 'https://s3.sa-east-1.amazonaws.com/ckan.saude.gov.br/SINASC/SINASC_2021.csv')
time.sleep(10)
dt_22 = read_csv_from_api(spark = spark, api_url = 'https://s3.sa-east-1.amazonaws.com/ckan.saude.gov.br/SINASC/DNOPEN22.csv')
time.sleep(10)
dt_23 = read_csv_from_api(spark = spark, api_url = 'https://s3.sa-east-1.amazonaws.com/ckan.saude.gov.br/SINASC/DNOPEN23.csv')
time.sleep(10)
dt_24 = read_csv_from_api(spark = spark, api_url = 'https://s3.sa-east-1.amazonaws.com/ckan.saude.gov.br/SINASC/DNOPEN24.csv')

