<a href="https://colab.research.google.com/github/PJbourne/Data_Science_course/blob/main/Tasks_Spark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Análise de dados utilizando RDD

In [None]:
from pyspark.sql import SparkSession

# Criar SparkSession
spark = SparkSession \
    .builder \
    .appName("Python Spark DataFrames") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()
sc = spark.sparkContext

# Carregar o arquivo como RDD
!curl -O https://raw.githubusercontent.com/farsilvar/data-mining-big-data-files/refs/heads/main/fake_web_logs.txt >> fake_web_logs.txt
logsRDD = sc.textFile("fake_web_logs.txt")

# Ver as 5 primeiras linhas
logsRDD.take(5)

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100  6618  100  6618    0     0  38088      0 --:--:-- --:--:-- --:--:-- 38254


['192.168.0.2 - - [08/Jun/2024:09:47:40] "GET /contact HTTP/1.1" 200',
 '192.168.0.2 - - [08/Jun/2024:09:37:21] "GET /about HTTP/1.1" 500',
 '192.168.0.5 - - [08/Jun/2024:09:50:15] "POST / HTTP/1.1" 404',
 '192.168.0.5 - - [08/Jun/2024:09:08:56] "POST /cart HTTP/1.1" 200',
 '192.168.0.1 - - [08/Jun/2024:09:10:02] "POST /contact HTTP/1.1" 200']

In [None]:
# Filtrar as requisições GET
getRDD = logsRDD.filter(lambda line: "GET" in line)

# Filtrar requisições POST
postRDD = logsRDD.filter(lambda line: "POST" in line)

In [None]:
getRDD.count()

46

In [None]:
postRDD.count()

54

In [None]:
# Extrair IP
ipsRDD = logsRDD.map(lambda line: line.split()[0])
# Contar quantos acessos por IP
ipCounts = ipsRDD.map(lambda ip: (ip, 1)).reduceByKey(lambda a,b: a+b)

In [None]:
# Contar quantos GET e POST
get_post_counts = logsRDD.map(lambda line: (line.split()[5], 1)).reduceByKey(lambda a,b: a+b)

# Ver acessos por IP
ip_access_counts = logsRDD.map(lambda line: (line.split()[0], 1)).reduceByKey(lambda a,b: a+b)

In [None]:
get_post_counts.take(5)

[('/about', 13),
 ('/', 19),
 ('/cart', 13),
 ('/index.html', 14),
 ('/contact', 20)]

In [None]:
# Criar cache para acelerar contagens repetidas
cache_count = ip_access_counts.cache()

# Primeira contagem (gera cache)
first_count = cache_count.count()
print(first_count)

# Segunda contagem (já usa cache)
second_count = cache_count.count()
print(second_count)

5
5


In [None]:
cache_count.take(5)

[('192.168.0.2', 22),
 ('192.168.0.1', 24),
 ('192.168.0.3', 22),
 ('192.168.0.5', 18),
 ('192.168.0.4', 14)]

## Análise de dados utilizando spark DataFrames

In [None]:
from pyspark.sql import SparkSession

# Criar SparkSession
spark = SparkSession \
    .builder \
    .appName("Python Spark DataFrames") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

In [None]:
# Lê o arquivo como DataFrame de uma coluna: 'value'
df_logs = spark.read.text("fake_web_logs.txt")

df_logs.show(5, truncate=False)

+-------------------------------------------------------------------+
|value                                                              |
+-------------------------------------------------------------------+
|192.168.0.2 - - [08/Jun/2024:09:47:40] "GET /contact HTTP/1.1" 200 |
|192.168.0.2 - - [08/Jun/2024:09:37:21] "GET /about HTTP/1.1" 500   |
|192.168.0.5 - - [08/Jun/2024:09:50:15] "POST / HTTP/1.1" 404       |
|192.168.0.5 - - [08/Jun/2024:09:08:56] "POST /cart HTTP/1.1" 200   |
|192.168.0.1 - - [08/Jun/2024:09:10:02] "POST /contact HTTP/1.1" 200|
+-------------------------------------------------------------------+
only showing top 5 rows



In [None]:
from pyspark.sql.functions import regexp_extract

# Extrair IP, método, endpoint e status em uma única operação de select
df_logs_processed = df_logs.select(
    regexp_extract("value", r"^(\S+)", 1).alias("ip"),
    regexp_extract("value", r"\[.*?\]\s\"([A-Z]+)", 1).alias("method"),
    regexp_extract("value", r"\s(\S+)\sHTTP", 1).alias("endpoint"),
    regexp_extract("value", r"\s(\d{3})$", 1).cast("integer").alias("status") # Adjusted regex for status
)

# Mostrar as primeiras 5 linhas com as colunas extraídas
df_logs_processed.show(5, truncate=False)

+-----------+------+--------+------+
|ip         |method|endpoint|status|
+-----------+------+--------+------+
|192.168.0.2|GET   |/contact|200   |
|192.168.0.2|GET   |/about  |500   |
|192.168.0.5|POST  |/       |404   |
|192.168.0.5|POST  |/cart   |200   |
|192.168.0.1|POST  |/contact|200   |
+-----------+------+--------+------+
only showing top 5 rows



In [None]:
# COntar GET e POST
get_post_spark = df_logs_processed.groupBy("method").count()
get_post_spark.show()

+------+-----+
|method|count|
+------+-----+
|  POST|   54|
|   GET|   46|
+------+-----+



In [None]:
# Contar requisições por ip
ip_request_count = df_logs_processed.groupBy("ip").count()
ip_request_count.show()

+-----------+-----+
|         ip|count|
+-----------+-----+
|192.168.0.2|   22|
|192.168.0.1|   24|
|192.168.0.3|   22|
|192.168.0.5|   18|
|192.168.0.4|   14|
+-----------+-----+



In [None]:
# Contar requisições POST por ip
ip_post_request_count = df_logs_processed.filter(df_logs_processed.method == "POST").groupBy("ip").count()
ip_post_request_count.show()

+-----------+-----+
|         ip|count|
+-----------+-----+
|192.168.0.2|   11|
|192.168.0.1|   15|
|192.168.0.3|   11|
|192.168.0.5|    7|
|192.168.0.4|   10|
+-----------+-----+



In [None]:
# Contar requisições com status 200
status_200_count = df_logs_processed.filter(df_logs_processed.status == 200).count()
status_200_count

56