In [None]:
# ========================== CÉLULA 1 ==========================
# Linhas que começam com "!" são COMANDOS DE SHELL executados pelo Jupyter
# diretamente no sistema operacional (Ubuntu/Debian no Colab/WSL/etc.). Esses
# comandos NÃO rodam no interpretador Python — eles chamam binários do SO.

# Atualiza o índice de pacotes do apt (lista local de pacotes/versões disponíveis nos repositórios).
# Não instala nada; apenas sincroniza metadados para que instalações futuras encontrem as versões mais novas.
!apt-get update

# Instala o JDK 8 (headless = sem componentes gráficos).
# O Apache Spark exige uma JVM. Versões do Spark são compatíveis com Java 8/11/17 (verifique a sua).
# O " -qq > /dev/null " suprime saídas detalhadas, deixando o output mais limpo.
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

# ---------- A partir daqui é Python ----------
import os  # Módulo padrão do Python para interagir com o Sistema Operacional (variáveis de ambiente, paths, etc.)

# Variável de ambiente JAVA_HOME: aponta para o diretório raiz do JDK.
# Diversas ferramentas (incluindo o Spark) usam isso para localizar 'java' e 'javac'.
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

# update-alternatives permite escolher qual versão do 'java' será o padrão do sistema
# quando existem múltiplas instalações. Aqui, fixamos a versão instalada acima.
!update-alternatives --set java /usr/lib/jvm/java-8-openjdk-amd64/jre/bin/java

# Checa a versão ativa do Java para confirmar que a configuração está OK.
!java -version

# Instala o cliente PySpark (biblioteca Python que conversa com o engine do Spark).
# Sem isso, "from pyspark import ..." não funcionaria.
!pip install pyspark

# Baixa (clona) o repositório com os arquivos de exemplo e os datasets (logs da NASA).
# "git clone <url>" cria uma pasta local "aulapython/" com o conteúdo remoto.
!git clone https://github.com/leonardoamorim/aulapython.git

# Lista os arquivos dentro de "aulapython/" para verificar o clone com sucesso.
! ls aulapython/

# Descompacta os arquivos de log (.gz) para .txt (sem compressão), assim o Spark e o shell
# conseguem ler normalmente. 'gunzip' substitui o arquivo .gz pelo extraído.
! gunzip aulapython/NASA_access_log_Jul95.gz
! gunzip aulapython/NASA_access_log_Aug95.gz

# Mostra o tamanho (uso de disco) de cada arquivo (opção -h = "human readable": KB/MB/GB).
! du -h aulapython/NASA_access_log_Jul95
! du -h aulapython/NASA_access_log_Aug95

# Conta quantas linhas existem (cada linha = 1 requisição registrada).
! wc -l aulapython/NASA_access_log_Jul95
! wc -l aulapython/NASA_access_log_Aug95

# Mostra as 10 primeiras linhas do arquivo de Julho para inspecionar o formato:
# Formato comum (Common Log Format estendido):
# host ident authuser [date:time zone] "request" status bytes
! head -n10 aulapython/NASA_access_log_Jul95

0% [Working]            Get:1 http://security.ubuntu.com/ubuntu jammy-security InRelease [129 kB]
0% [Waiting for headers] [1 InRelease 5,484 B/129 kB 4%] [Waiting for headers]                                                                                Hit:2 http://archive.ubuntu.com/ubuntu jammy InRelease
0% [1 InRelease 5,484 B/129 kB 4%] [Waiting for headers] [Connected to r2u.stat                                                                               Get:3 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease [3,632 B]
Get:4 https://cli.github.com/packages stable InRelease [3,917 B]
Get:5 http://archive.ubuntu.com/ubuntu jammy-updates InRelease [128 kB]
Get:6 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  InRelease [1,581 B]
Get:7 https://r2u.stat.illinois.edu/ubuntu jammy InRelease [6,555 B]
Get:8 http://archive.ubuntu.com/ubuntu jammy-backports InRelease [127 kB]
Get:9 https://cli.github.com/packages stable/main amd64

In [None]:
# ========================== CÉLULA 2 ==========================
"""
# Conjunto de dados distribuído resiliente (RDD)

O RDD é a API de baixo nível do Spark: uma coleção distribuída e imutável,
particionada e processável em paralelo. Você aplica TRANSFORMAÇÕES (lazy),
como `map`, `filter`, `reduceByKey`, e executa com AÇÕES como `count`,
`collect`, `take`.

Quando usar RDDs?
- Quando precisa de controle de baixo nível.
- Dados não estruturados (logs, texto).
- Preferência por programação funcional ao invés de expressões SQL.
- Não precisa impor esquema (colunas).
- Aceita abrir mão de otimizações automáticas de DataFrames (Catalyst/Tungsten).
"""

'\n# Conjunto de dados distribuído resiliente (RDD)\n\nO RDD é a API de baixo nível do Spark: uma coleção distribuída e imutável,\nparticionada e processável em paralelo. Você aplica TRANSFORMAÇÕES (lazy),\ncomo `map`, `filter`, `reduceByKey`, e executa com AÇÕES como `count`,\n`collect`, `take`.\n\nQuando usar RDDs?\n- Quando precisa de controle de baixo nível.\n- Dados não estruturados (logs, texto).\n- Preferência por programação funcional ao invés de expressões SQL.\n- Não precisa impor esquema (colunas).\n- Aceita abrir mão de otimizações automáticas de DataFrames (Catalyst/Tungsten).\n'

In [None]:
# ========================== CÉLULA 3 ==========================
# "from pyspark import SparkConf, SparkContext"
# - SparkConf: objeto de configuração (master, appName, memória, etc).
# - SparkContext: ponto de entrada da API de RDD (baixo nível).
from pyspark import SparkConf, SparkContext

# 'add' é a função de soma do módulo 'operator', usada em reduções (reduceByKey(add))
# para somar contagens por chave de forma legível e eficiente.
from operator import add

# Cria uma configuração de Spark:
# - SparkConf(): construtor do objeto de configuração do Spark.
# - .setMaster("local"): executa localmente com 1 thread (útil p/ testes). DICA: "local[*]" usa todos os cores.
# - .setAppName("Exercicio Nasa Logs"): nome que aparece na UI do Spark e nos logs.
# - .set("spark.executor.memory", "5g"): memória destinada a cada executor (~5 GiB).
#   Em modo "local", você terá 1 executor; em cluster, isso vale por executor.
configuracao = (SparkConf()
                .setMaster("local")
                .setAppName("Exercicio Nasa Logs")
                .set("spark.executor.memory", "5g"))

# 'type(obj)': função nativa do Python que retorna o tipo do objeto, útil para checagens rápidas.
type(configuracao)

# Cria o SparkContext com a configuração acima.
# IMPORTANTE: deve haver apenas UM SparkContext por aplicação.
sc = SparkContext(conf = configuracao)

# Confirma que 'sc' é um SparkContext.
type(sc)

In [None]:
# ========================== CÉLULA 4 ==========================
# Lê os arquivos como RDDs de strings (cada elemento = 1 linha do arquivo).
# 'sc.textFile(path)' é LAZY: a leitura real ocorre apenas quando executamos uma ACTION.
julho = sc.textFile('aulapython/NASA_access_log_Jul95')
agosto = sc.textFile('aulapython/NASA_access_log_Aug95')

# 'cache()': mantém o RDD em memória após a primeira computação.
# Isso evita recomputar a cadeia de transformações em AÇÕES subsequentes,
# acelerando o workflow.
julho = julho.cache()
agosto = agosto.cache()

# Inspeção via shell das primeiras linhas do arquivo de julho (não é Spark).
! head -n3 aulapython/NASA_access_log_Jul95

199.72.81.55 - - [01/Jul/1995:00:00:01 -0400] "GET /history/apollo/ HTTP/1.0" 200 6245
unicomp6.unicomp.net - - [01/Jul/1995:00:00:06 -0400] "GET /shuttle/countdown/ HTTP/1.0" 200 3985
199.120.110.21 - - [01/Jul/1995:00:00:09 -0400] "GET /shuttle/missions/sts-73/mission-sts-73.html HTTP/1.0" 200 4085


In [None]:
# ========================== CÉLULA 5 ==========================
# Exemplo de linha de log (string). Isso facilita explicar parsing e tokens.
exemplo = '199.72.81.55 - - [01/Jul/1995:00:00:01 -0400] "GET /history/apollo/ HTTP/1.0" 200 6245'

# Checa o tipo do objeto (deve ser 'str').
type(exemplo)

# 'split()' (sem argumentos) separa por QUALQUER espaço em branco.
operacao = exemplo.split()

print(operacao)   # Lista de tokens da linha
print(operacao[0])# 'operacao[0]' usa INDEXAÇÃO por '[]' para obter o 1º item (host/IP)

# Mapeia as primeiras 5 linhas do RDD 'julho' para o 1º token (host) e coleta para inspeção.
julho.map(lambda line: line.split()[0]).take(5)

# Variante com separador explícito: split(' ') (equivalente neste caso).
# 'distinct()' remove duplicatas via shuffle.
# 'count()' é ACTION: dispara a execução e retorna a contagem de elementos (hosts únicos).
julho.map(lambda line: line.split(' ')[0]).distinct().count()

['199.72.81.55', '-', '-', '[01/Jul/1995:00:00:01', '-0400]', '"GET', '/history/apollo/', 'HTTP/1.0"', '200', '6245']
199.72.81.55


81983

In [None]:
# ========================== CÉLULA 6 ==========================
def obterQtdHosts(rdd):
  """
  Retorna a quantidade de hosts distintos em um RDD de linhas de log.
  Etapas:
    1) line.split(' ')[0] -> extrai o host/IP (1º token).
    2) distinct() -> remove host repetido.
    3) count() -> ACTION que retorna a quantidade de elementos únicos.
  """
  qtdHosts = rdd.map(lambda line: line.split(' ')[0]).distinct().count()
  return qtdHosts

# Número de hosts distintos em Julho (calculado diretamente)
contagem_julho = julho.map(lambda line: line.split(' ')[0]).distinct().count()
print("Numero de hosts distintos no mes de Julho:", contagem_julho)

# Usando a função
obterQtdHosts(julho)

# Número de hosts distintos em Agosto
contagem_agosto = agosto.map(lambda line: line.split(' ')[0]).distinct().count()
print("Numero de hosts distintos no mes de Agosto:", contagem_agosto)

# Usando a função
obterQtdHosts(agosto)

Numero de hosts distintos no mes de Julho: 81983
Numero de hosts distintos no mes de Agosto: 75060


75060

In [None]:
# ========================== CÉLULA 7 ==========================
def codigo404(linha):
    """
    Retorna True se o penúltimo token da linha for '404' (código HTTP de "Not Found").
    Estrutura típica de linha:
      ... "REQUEST" STATUS BYTES
      penúltimo token = STATUS
      último token    = BYTES (pode ser '-')
    """
    try:
        # 'linha.split()[-2]' pega o penúltimo token usando índice negativo.
        # Em Python, índices negativos contam a partir do fim: [-1] último, [-2] penúltimo.
        codigohttp = linha.split()[-2]
        if codigohttp == '404':
            return True
    except:
        # Linhas malformadas (sem tokens suficientes) ou erros de parsing caem aqui.
        pass
    return False

# Teste da função (esta linha tem status 200, então deve retornar False)
codigo404(exemplo)

# Apenas para relembrar o conteúdo
exemplo

print(exemplo)

# Linha sintética com status 404 (para validar True)
exemplo2 = '199.72.81.55 - - [01/Jul/1995:00:00:01 -0400] "GET /history/apollo/ HTTP/1.0" 404 6245'
codigo404(exemplo2)  # Esperado: True

199.72.81.55 - - [01/Jul/1995:00:00:01 -0400] "GET /history/apollo/ HTTP/1.0" 200 6245


True

In [None]:
# ========================== CÉLULA 8 ==========================
# Filtra linhas de julho que têm status 404 usando a função robusta 'codigo404'.
# 'cache()' para reutilizar os resultados sem recomputar o filtro.
erros404_julho = julho.filter(codigo404).cache()

# Em agosto usamos uma lambda equivalente, mas MENOS robusta (assume que sempre existe penúltimo token).
# Preferir a versão com try/except para tolerar linhas malformadas.
erros404_agosto = agosto.filter(lambda linha: linha.split(' ')[-2] == '404').cache()

# 'count()' é ACTION que dispara a execução e retorna a quantidade de elementos no RDD.
print('Erros 404 em Julho: %s' % erros404_julho.count())
print('Erros 404 em Agosto: %s' % erros404_agosto.count())

Erros 404 em Julho: 10845
Erros 404 em Agosto: 10056


In [None]:
# ========================== CÉLULA 9 ==========================
print(exemplo)
print(exemplo.split('"'))                 # Quebra a linha nas aspas -> ["antes", 'GET /... HTTP/1.x', "depois"]
print(exemplo.split('"')[1].split(' ')[1])# Pega o bloco do meio (índice 1) e, dele, o 2º token => a URL

# Extrai apenas a URL das linhas 404 de julho (amostra de 5)
erros404_julho.map(lambda linha: linha.split('"')[1].split(' ')[1]).take(5)

# Mapeia cada URL para o par (url, 1), ou seja, 1 ocorrência por linha/URL. Amostra.
erros404_julho.map(lambda linha: linha.split('"')[1].split(' ')[1]) \
              .map(lambda urls: (urls, 1)) \
              .take(5)

# Pipeline completo: URLs -> (url, 1) -> contagem por chave com reduceByKey(add)
urls = erros404_julho.map(lambda linha: linha.split('"')[1].split(' ')[1])
counts = urls.map(lambda urls: (urls, 1)).reduceByKey(add)

counts.take(10)  # amostra de 10 pares (url, contagem)
type(counts)     # RDD[Tuple[str, int]]

199.72.81.55 - - [01/Jul/1995:00:00:01 -0400] "GET /history/apollo/ HTTP/1.0" 200 6245
['199.72.81.55 - - [01/Jul/1995:00:00:01 -0400] ', 'GET /history/apollo/ HTTP/1.0', ' 200 6245']
/history/apollo/


In [None]:
# ========================== CÉLULA 10 ==========================
def top5_hosts404(rdd):
    """
    Calcula as 5 URLs com mais erros 404 em um RDD de linhas de log 404.
    Passos:
      1) extrair URL: linha.split('"')[1].split(' ')[1]
      2) map para (url, 1)
      3) reduceByKey(add) para somar
      4) sortBy(lambda par: -par[1]) para ordenar por contagem desc
      5) take(5)
    """
    urls = rdd.map(lambda linha: linha.split('"')[1].split(' ')[1])   # '/history/apollo/'
    counts = urls.map(lambda urls: (urls, 1)).reduceByKey(add)        # (url, soma)
    top5 = counts.sortBy(lambda par: -par[1]).take(5)                 # ordena por -contagem (descendente)
    return top5

# Retorna lista com 5 tuplas (url, contagem), ordenadas por maior contagem
top5_hosts404(erros404_julho)
top5_hosts404(erros404_agosto)

[('/pub/winvn/readme.txt', 1337),
 ('/pub/winvn/release.txt', 1185),
 ('/shuttle/missions/STS-69/mission-STS-69.html', 683),
 ('/images/nasa-logo.gif', 319),
 ('/shuttle/missions/sts-68/ksc-upclose.gif', 253)]

In [None]:
# ========================== CÉLULA 11 ==========================
print(exemplo)                      # relembrando o formato geral
print(exemplo.split('[')[1])        # pega tudo após o primeiro '[': "01/Jul/1995:00:00:01 -0400]"
print(exemplo.split('[')[1].split(':')[0])  # fica só a parte "01/Jul/1995"

# Função para contar erros 404 por DIA (retorna lista Python de tuplas (dia, contagem)).
def contador_dias_404(rdd):
    """
    1) Extrai o dia no formato 'dd/Mon/yyyy' do trecho entre colchetes.
    2) Mapeia para (dia, 1) e soma com reduceByKey(add).
    3) collect() traz o resultado (lista de tuplas) para o Driver.
    """
    dias = rdd.map(lambda linha: linha.split('[')[1].split(':')[0])  # '01/Jul/1995'
    counts = dias.map(lambda dia: (dia, 1)).reduceByKey(add).collect()
    return counts

print(contador_dias_404(erros404_julho))  # lista Python de ("01/Jul/1995", 123), ...

199.72.81.55 - - [01/Jul/1995:00:00:01 -0400] "GET /history/apollo/ HTTP/1.0" 200 6245
01/Jul/1995:00:00:01 -0400] "GET /history/apollo/ HTTP/1.0" 200 6245
01/Jul/1995
[('01/Jul/1995', 316), ('09/Jul/1995', 348), ('21/Jul/1995', 334), ('26/Jul/1995', 336), ('27/Jul/1995', 336), ('17/Jul/1995', 406), ('20/Jul/1995', 428), ('23/Jul/1995', 233), ('11/Jul/1995', 471), ('12/Jul/1995', 471), ('03/Jul/1995', 474), ('05/Jul/1995', 497), ('07/Jul/1995', 570), ('22/Jul/1995', 192), ('28/Jul/1995', 94), ('02/Jul/1995', 291), ('13/Jul/1995', 532), ('19/Jul/1995', 639), ('14/Jul/1995', 413), ('16/Jul/1995', 257), ('24/Jul/1995', 328), ('04/Jul/1995', 359), ('06/Jul/1995', 640), ('08/Jul/1995', 302), ('10/Jul/1995', 398), ('15/Jul/1995', 254), ('18/Jul/1995', 465), ('25/Jul/1995', 461)]


In [None]:
# ========================== CÉLULA 12 ==========================
# Exemplo bobo de ordenação alfabética com Python puro (sem Spark).
a = ("b", "g", "a", "d", "f", "c", "h", "e")
x = sorted(a)  # 'sorted' retorna NOVA lista ordenada; não altera 'a'.
print(x)

# Agora, ordenando os dias por quantidade de 404 (descendente) com Python puro:
# 'contador_dias_404(...)' já devolve uma LISTA Python de tuplas (dia, contagem).
# 'sorted(lista, key=..., reverse=...)' ordena essa lista.
#
# key=lambda x: -x[1]
# - 'lambda x: ...' cria uma função anônima.
# - 'x' é cada tupla (dia, contagem).
# - 'x[1]' é a CONTAGEM (2º elemento da tupla) — indexação via '[]'.
# - '-x[1]' usa o NEGATIVO da contagem para transformar a ordenação padrão (asc) em desc.
#
# Equivalente (mais legível): key=lambda x: x[1], reverse=True

sorted(contador_dias_404(erros404_julho), key=lambda x: -x[1])

# Ordenação por TEXTO (dia), desc: para strings, use reverse=True (não faz sentido negar string).
sorted(contador_dias_404(erros404_julho), key=lambda x: x[0], reverse=True)

# Ordenação por TEXTO (dia), asc:
sorted(contador_dias_404(erros404_julho), key=lambda x: x[0])

# Reaproveita a função para agosto (lista não ordenada):
contador_dias_404(erros404_agosto)

# Conversão de string para inteiro (exemplo auxiliar):
int('10')  # retorna 10 (int)

['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h']


10

In [None]:
# ========================== CÉLULA 13 ==========================
def quantidade_bytes_acumulados(rdd):
    """
    Soma o campo de BYTES (último token) de cada linha.
    O campo pode ser '-' (sem valor) -> tratamos como 0.
    Também ignoramos valores inválidos via try/except.
    """
    def contador(linha):
        try:
            # 'linha.split(" ")[-1]' pega o ÚLTIMO token (índice -1) — BYTES.
            # int(...) converte de string para inteiro; se for '-', cai no except e volta 0.
            count = int(linha.split(" ")[-1])
            if count < 0:
                # bytes negativos não fazem sentido — dispara exceção para tratar como 0.
                raise ValueError()
            return count
        except:
            # Retorna 0 para linhas malformadas ou com '-' em BYTES.
            return 0

    # 'map(contador)': aplica a função acima em cada linha, produzindo um RDD de inteiros (bytes).
    # 'reduce(add)': soma todos os inteiros de forma DISTRIBUÍDA (reduceByKey é para pares (k,v); aqui é reduce simples).
    count = rdd.map(contador).reduce(add)
    return count

print('Quantidade de bytes total em Julho: %s' % quantidade_bytes_acumulados(julho))
print('Quantidade de bytes total em Agosto: %s' % quantidade_bytes_acumulados(agosto))

Quantidade de bytes total em Julho: 38695973491
Quantidade de bytes total em Agosto: 26828341424


In [None]:
# ========================== CÉLULA 14 ==========================
# Encerra o contexto do Spark e libera recursos locais/cluster.
sc.stop()