Iniciando Sess√£o SPARK

spark.default.parallelism:
Este par√¢metro define o n√∫mero padr√£o de parti√ß√µes a serem criadas ao ler arquivos e ao executar opera√ß√µes como mapas em RDDs. Definir isso para um valor mais alto pode aumentar o n√∫mero de tarefas.

spark.sql.shuffle.partitions:
Esse par√¢metro define o n√∫mero de parti√ß√µes a serem usadas ao embaralhar dados para opera√ß√µes de jun√ß√£o, agrega√ß√£o, etc., em DataFrames. Aumentar esse valor pode aumentar o n√∫mero de tarefas geradas durante essas opera√ß√µes.

spark.executor.instances e spark.executor.cores:
Estes par√¢metros controlam o n√∫mero de executores e o n√∫mero de n√∫cleos por executor, respectivamente. Aumentar o n√∫mero de executores e/ou o n√∫mero de n√∫cleos por executor pode aumentar o n√∫mero total de tarefas que podem ser executadas simultaneamente.

spark.task.cpus:
Este par√¢metro define o n√∫mero m√°ximo de n√∫cleos de CPU a serem usados ‚Äã‚Äãpor tarefa. Aumentar esse valor pode permitir que mais tarefas sejam executadas simultaneamente em um √∫nico executor.


spark = SparkSession.builder \
    .appName("MyApp") \
    .config("spark.default.parallelism", "100") \
    .config("spark.sql.shuffle.partitions", "200") \
    .config("spark.executor.instances", "4") \
    .config("spark.executor.cores", "4") \
    .config("spark.task.cpus", "1") \
    .config("spark.ui.retainedTasks", "10") \
    .getOrCreate()

In [20]:
!pip install pymongo
!pip install findspark
!pip install python-dotenv


[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m24.3.1[0m[39;49m -> [0m[32;49m26.0.1[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m

[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m24.3.1[0m[39;49m -> [0m[32;49m26.0.1[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m

[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m24.3.1[0m[39;49m -> [0m[32;49m26.0.1[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m


In [21]:

from pyspark.sql import SparkSession
import findspark
from dotenv import load_dotenv
import os

# Load environment variables from .env file
load_dotenv()


os.environ["spark_home"] = "\\spark-3.1.2-bin-hadoop2.7"
findspark.init()
spark = SparkSession.builder \
    .master('local[*]') \
    .appName("Iniciando com Spark") \
    .config("spark.default.parallelism", "100") \
    .config("spark.executor.instances", "10") \
    .config("spark.executor.cores", "10") \
    .getOrCreate()
spark

**Importando Lista de IPs**

In [22]:
# Importe a biblioteca requests
import requests

# Fa√ßa uma solicita√ß√£o HTTP para obter o conte√∫do do arquivo
banlist = "http://cinsscore.com/list/ci-badguys.txt"
response = requests.get(banlist)
lines = response.text.splitlines()
lines = lines[:100] #pegar somente 10 IPs

# Cria um RDD (Resilient Distributed Dataset) chamando ips a partir das linhas fornecidas,
# distribuindo essas linhas entre os n√≥s do cluster Spark
# para processamento paralelo. Este RDD pode ent√£o ser usado
# para realizar opera√ß√µes distribu√≠das em larga escala nos dados.
ips = spark.sparkContext.parallelize(lines)

**Fun√ß√£o para acessar a API para enriquecer dados**

In [23]:
#Fun√ß√£o para acrescentar dados ao IP
import requests

#IP INFO
def get_ip_info(ip):
    IPINFO_AK = os.getenv("IPINFO_AK")

    url = "https://ipinfo.io/{}/json"
    headers = {
        "Authorization": f"Bearer {IPINFO_AK}"
    }
    response = requests.get(url.format(ip), headers=headers)
    if response.status_code == 200:
        return response.json()
    else:
        print(f"Erro {response.status_code}: {response.text}")
        return {}

#ABUSE IP PD
def abuseipdb(ip):
    url = 'https://api.abuseipdb.com/api/v2/check'
    API_KEY = os.getenv("ABUSE_AK")

    querystring = {
        'ipAddress': ip,
        'maxAgeInDays': '90'  # Per√≠odo de tempo para considerar relat√≥rios (em dias)
    }
    headers = {
        'Accept': 'application/json',
        'Key': API_KEY
    }

    response = requests.get(url, headers=headers, params=querystring)
    if response.status_code == 200:
        return response.json()
    else:
        return {}

#print(abuseipdb('1.14.3.240'))
#print(get_ip_info('1.14.3.240'))

**Script para enriquecer dados**

In [24]:
import json
import datetime as dt

# Pegando a quantidade de IPs importados
num_ips = ips.count()
print("N√∫mero total de IPs:", num_ips)

if num_ips > 0:
    inicio_tarefa = dt.datetime.now()

    # Coleta as informa√ß√µes para todos os IPs de uma s√≥ vez
    ip_infos = {ip: get_ip_info(ip.strip()) for ip in ips.collect()}

    # Coleta os dados do abuseipdb para todos os IPs de uma s√≥ vez
    abuse_infos = {ip: abuseipdb(ip) for ip in ips.collect()}

    # Map para enriquecer os dados
    enriched_data_rdd = ips.map(lambda ip: {
        'listIP': ip,
        'listName': 'cinsscore',
        'RecommendedUsage': 'Incoming',
        'Description': 'cinsscore all list contains IP addresses.',
        'Timestamp': dt.datetime.now().isoformat(),

        # Informa√ß√µes do get_ip_info
        'city': ip_infos[ip].get('city', ''),
        'region': ip_infos[ip].get('region', ''),
        'country': ip_infos[ip].get('country', ''),
        'latitude': ip_infos[ip].get('loc', '').split(',')[0],
        'longitude': ip_infos[ip].get('loc', '').split(',')[1],
        'org': ip_infos[ip].get('org', ''),
        'postal': ip_infos[ip].get('postal', ''),
        'timezone': ip_infos[ip].get('timezone', ''),

        # Informa√ß√µes do abuseipdb
        'isPublic': abuse_infos[ip].get('data', {}).get('isPublic', ''),
        'ipVersion': abuse_infos[ip].get('data', {}).get('ipVersion', ''),
        'isWhitelisted': abuse_infos[ip].get('data', {}).get('isWhitelisted', 'False'),
        'abuseConfidenceScore': abuse_infos[ip].get('data', {}).get('abuseConfidenceScore', ''),
        'usageType': abuse_infos[ip].get('data', {}).get('usageType', ''),
        'isp': abuse_infos[ip].get('data', {}).get('isp', ''),
        'domain': abuse_infos[ip].get('data', {}).get('domain', ''),
        'totalReports': abuse_infos[ip].get('data', {}).get('totalReports', 0),
        'lastReportedAt': abuse_infos[ip].get('data', {}).get('lastReportedAt', ''),
        'numDistinctUsers': abuse_infos[ip].get('data', {}).get('numDistinctUsers', 0)
    })

    fim_tarefa = dt.datetime.now()
    print('Dura√ß√£o enriquecimento:', fim_tarefa - inicio_tarefa)

                                                                                

N√∫mero total de IPs: 100
Dura√ß√£o enriquecimento: 0:01:20.055399


 **Carga no MongoDB (UPSERT)**

In [29]:
from pymongo import MongoClient
import datetime as dt

# --- Coletar os resultados do RDD localmente ---
enriched_data = enriched_data_rdd.collect()

print(f"üì¶ Registros a inserir: {len(enriched_data)}")

if not enriched_data:
    raise Exception("Nenhum dado para inserir no MongoDB")

# --- Conex√£o MongoDB ---
MONGO_URI = os.environ.get("MONGO_URI")
DB_NAME = "etl_ips"
COLLECTION_NAME = "cinsscore_ips"

client = MongoClient(MONGO_URI)
db = client[DB_NAME]
collection = db[COLLECTION_NAME]

# (Opcional) limpar cole√ß√£o antes de inserir tudo novamente
# collection.delete_many({})
# print("üóëÔ∏è Cole√ß√£o limpa")

inicio_tarefa = dt.datetime.now()

# --- INSER√á√ÉO EM MASSA ---
result = collection.insert_many(enriched_data)

fim_tarefa = dt.datetime.now()

print("‚è±Ô∏è Dura√ß√£o ingest√£o MongoDB:", fim_tarefa - inicio_tarefa)
print(f"‚úÖ {len(result.inserted_ids)} documentos inseridos com sucesso")


AttributeError: 'NoneType' object has no attribute 'setCallSite'

In [26]:
spark.stop()

**MAPA COM GEOCODIFICA√á√ÉO**

In [27]:
!pip install folium pymongo geopy > /dev/null


[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m24.3.1[0m[39;49m -> [0m[32;49m26.0.1[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m


In [28]:
from pymongo import MongoClient
import folium
from geopy.geocoders import Nominatim
from time import sleep

# üîå MongoDB
MONGO_URI = os.environ.get("MONGO_URI")
DB_NAME = "etl_ips"
COLLECTION_NAME = "cinsscore_ips"

client = MongoClient(MONGO_URI)
db = client[DB_NAME]
collection = db[COLLECTION_NAME]

docs = list(collection.find({
    "city": {"$ne": ""},
    "country": {"$ne": ""}
}))

print(f"üìä Registros com cidade/pa√≠s: {len(docs)}")

# üåç Geocoder (OpenStreetMap)
geolocator = Nominatim(user_agent="ip-map-colab")

# üó∫Ô∏è Mapa base (mundo)
mapa = folium.Map(
    location=[20, 0],
    zoom_start=2,
    tiles="CartoDB positron"
)

for d in docs:
    city = d.get("city")
    country = d.get("country")

    try:
        location = geolocator.geocode(f"{city}, {country}", timeout=10)
        sleep(1)  # evita bloqueio do servi√ßo

        if not location:
            continue

        lat, lon = location.latitude, location.longitude

        popup = f"""
        <b>IP:</b> {d.get('listIP')}<br>
        <b>Cidade:</b> {city}<br>
        <b>Pa√≠s:</b> {country}<br>
        <b>Score:</b> {d.get('abuseConfidenceScore', 0)}<br>
        <b>Usu√°rios distintos:</b> {d.get('numDistinctUsers', 0)}<br>
        <b>Total Reports:</b> {d.get('totalReports', 0)}
        """

        folium.CircleMarker(
            location=[lat, lon],
            radius=6 + min(d.get("totalReports", 0) / 20, 10),
            popup=popup,
            color="red" if d.get("totalReports", 0) > 0 else "blue",
            fill=True,
            fill_opacity=0.7
        ).add_to(mapa)

    except Exception as e:
        continue

# üñ•Ô∏è Exibir
mapa


üìä Registros com cidade/pa√≠s: 200
