In [None]:
# Instale o PySpark no Google Colab
!pip install pyspark


Collecting pyspark
  Downloading pyspark-3.5.2.tar.gz (317.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.3/317.3 MB[0m [31m4.6 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.2-py2.py3-none-any.whl size=317812365 sha256=031c27e1d6b19988b98c7a8da1086d9447113574eceddf8ef89834fb07832288
  Stored in directory: /root/.cache/pip/wheels/34/34/bd/03944534c44b677cd5859f248090daa9fb27b3c8f8e5f49574
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.2


In [None]:
from pyspark.sql import SparkSession

# Inicialize a sessão Spark
spark = SparkSession.builder.appName("PySparkTest").getOrCreate()

# Dados e colunas
data = [
    ("Alice", 34, "Data Scientist"),
    ("Bob", 45, "Data Engineer"),
    ("Cathy", 29, "Data Analyst"),
    ("David", 35, "Data Scientist")
]
columns = ["Name", "Age", "Occupation"]

# Criação do DataFrame
df = spark.createDataFrame(data, schema=columns)
df.show()


+-----+---+--------------+
| Name|Age|    Occupation|
+-----+---+--------------+
|Alice| 34|Data Scientist|
|  Bob| 45| Data Engineer|
|Cathy| 29|  Data Analyst|
|David| 35|Data Scientist|
+-----+---+--------------+



In [None]:
# Selecionar as colunas "Name" e "Age"
df_selected = df.select("Name", "Age")

# Filtrar as linhas onde a "Age" é maior que 30
df_filtered = df_selected.filter(df_selected["Age"] > 30)
df_filtered.show()


+-----+---+
| Name|Age|
+-----+---+
|Alice| 34|
|  Bob| 45|
|David| 35|
+-----+---+



In [None]:
from pyspark.sql.functions import avg

# Agrupar por "Occupation" e calcular a média de "Age"
df_grouped = df.groupBy("Occupation").agg(avg("Age").alias("Average_Age"))
df_grouped.show()


+--------------+-----------+
|    Occupation|Average_Age|
+--------------+-----------+
|Data Scientist|       34.5|
| Data Engineer|       45.0|
|  Data Analyst|       29.0|
+--------------+-----------+



In [None]:
# Ordenar por média de idade em ordem decrescente
df_sorted = df_grouped.orderBy(df_grouped["Average_Age"].desc())
df_sorted.show()


+--------------+-----------+
|    Occupation|Average_Age|
+--------------+-----------+
| Data Engineer|       45.0|
|Data Scientist|       34.5|
|  Data Analyst|       29.0|
+--------------+-----------+



In [None]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

# Função Python para categorizar a idade
def categorize_age(age):
    if age < 30:
        return "Jovem"
    elif 30 <= age <= 40:
        return "Adulto"
    else:
        return "Senior"

# Converter a função em uma UDF
categorize_age_udf = udf(categorize_age, StringType())

# Aplicar a UDF no DataFrame
df_with_category = df.withColumn("Age_Category", categorize_age_udf(df["Age"]))
df_with_category.show()


+-----+---+--------------+------------+
| Name|Age|    Occupation|Age_Category|
+-----+---+--------------+------------+
|Alice| 34|Data Scientist|      Adulto|
|  Bob| 45| Data Engineer|      Senior|
|Cathy| 29|  Data Analyst|       Jovem|
|David| 35|Data Scientist|      Adulto|
+-----+---+--------------+------------+



In [None]:
from pyspark.sql.window import Window
from pyspark.sql.functions import avg

# Definir a janela
windowSpec = Window.partitionBy("Occupation")

# Calcular a média de idade por "Occupation" e a diferença de idade
df_with_window = df.withColumn("Avg_Age_Occupation", avg("Age").over(windowSpec))
df_with_window = df_with_window.withColumn("Age_Difference", df_with_window["Age"] - df_with_window["Avg_Age_Occupation"])
df_with_window.show()


+-----+---+--------------+------------------+--------------+
| Name|Age|    Occupation|Avg_Age_Occupation|Age_Difference|
+-----+---+--------------+------------------+--------------+
|Cathy| 29|  Data Analyst|              29.0|           0.0|
|  Bob| 45| Data Engineer|              45.0|           0.0|
|Alice| 34|Data Scientist|              34.5|          -0.5|
|David| 35|Data Scientist|              34.5|           0.5|
+-----+---+--------------+------------------+--------------+



In [None]:
# Exemplo de particionamento por coluna "Occupation"
df_partitioned = df.repartition(3, "Occupation")
df_partitioned.write.partitionBy("Occupation").parquet("/content/occupation_partitioned.parquet")


In [None]:
from pyspark.sql.functions import broadcast

# Criar DataFrames para o join
small_df = spark.createDataFrame([("Data Scientist", "DS"), ("Data Engineer", "DE")], ["Occupation", "Abbreviation"])
large_df = df

# Aplicar o Broadcast Join
result_df = large_df.join(broadcast(small_df), "Occupation")
result_df.show()


+--------------+-----+---+------------+
|    Occupation| Name|Age|Abbreviation|
+--------------+-----+---+------------+
|Data Scientist|Alice| 34|          DS|
| Data Engineer|  Bob| 45|          DE|
|Data Scientist|David| 35|          DS|
+--------------+-----+---+------------+



## Integração com Outras Tecnologias

**Leitura de um arquivo CSV**

Para ler um arquivo CSV em PySpark, usamos o método read.csv() da classe SparkSession. Este método permite carregar dados de um arquivo CSV em um DataFrame do PySpark, facilitando o processamento e a análise de grandes volumes de dados.

Atribuição: \\
1) *header=True*: Informa ao PySpark que o primeiro registro no arquivo CSV é o cabeçalho. \\
2) *inferSchema=True*: Solicita ao PySpark que infira automaticamente o tipo de dado de cada coluna (e.g., inteiro, string, etc.).

In [None]:
# Leitura de um arquivo CSV no Google Colab
df_csv = spark.read.csv("/content/sample.csv", header=True, inferSchema=True)

# Mostrar o conteúdo do DataFrame
df_csv.show()


**Escrita em formato Parquet**

O formato Parquet é um formato de armazenamento em colunas que é eficiente tanto em termos de espaço quanto de velocidade de leitura/escrita. O PySpark oferece suporte para a escrita de DataFrames em Parquet usando o método *write.parquet()*.

In [None]:
# Escrita do DataFrame em formato Parquet
df_csv.write.parquet("/content/output.parquet")


**Integração do PySpark com o Hadoop HDFS**

O Hadoop Distributed File System (HDFS) é um sistema de arquivos distribuído utilizado em ambientes Hadoop para armazenar grandes volumes de dados. O PySpark se integra perfeitamente ao HDFS, permitindo a leitura e escrita de dados diretamente no sistema distribuído.

Para trabalhar com HDFS no PySpark, é necessário estar em um ambiente que tenha acesso a um cluster Hadoop, o que normalmente envolve configuração de um ambiente Hadoop ou a utilização de uma plataforma de nuvem que suporte Hadoop.

**Leitura de um arquivo do HDFS**

Atribuições:

1) hdfs://namenode_host:port/: Esta parte da URL define o nome do nó (NameNode) e a porta do HDFS que será usada. Deve ser substituída pela URL real do seu ambiente Hadoop. \\
2) O resto do caminho (/path/to/input.csv) especifica o local do arquivo no HDFS.

In [None]:
# Leitura de um arquivo CSV a partir do HDFS
df_hdfs = spark.read.csv("hdfs://namenode_host:port/path/to/input.csv", header=True, inferSchema=True)

# Mostrar o conteúdo do DataFrame
df_hdfs.show()


Escrita de um arquivo de volta ao HDFS

Depois de processar os dados, você pode salvá-los de volta no HDFS em diferentes formatos, como Parquet, CSV, etc. Aqui está um exemplo de como salvar os dados no HDFS em formato Parquet:



In [None]:
# Escrita do DataFrame no HDFS em formato Parquet
df_hdfs.write.parquet("hdfs://namenode_host:port/path/to/output.parquet")


## Problema de Caso

**Processamento de Logs**

Neste problema, você temos um grande arquivo de log com as seguintes colunas: "timestamp", "user_id", "action". Cada linha representa uma ação realizada por um usuário em um determinado momento. A tarefa é processar este arquivo de log usando PySpark para extrair insights.

**Carregue o arquivo de log em um DataFrame **

Primeiro, carregamos o arquivo de log em um DataFrame usando a função *read.csv()*:


Atribuições: \\
1) timestamp: Representa o momento em que a ação foi realizada. \\
2) user_id: Um identificador único para cada usuário. \\
3) action: A ação realizada pelo usuário.

In [None]:
# Carregar o arquivo de log em um DataFrame
df_log = spark.read.csv("/content/logfile.csv", header=True, inferSchema=True)

# Mostrar o conteúdo do DataFrame
df_log.show()


**Conte o número de ações realizadas por cada usuário**

Usamos o método *groupBy()* para agrupar as ações pelo* user_id* e a função *count()* para contar o número de ações realizadas por cada usuário.

Atribuições:

1) *groupBy("user_id"):* Agrupa os registros com base no identificador do usuário. \\
2) *count():* Conta o número de ações realizadas por cada usuário.

In [None]:
# Contar o número de ações por usuário
df_user_actions = df_log.groupBy("user_id").count()

# Mostrar o resultado
df_user_actions.show()


**Encontre os 10 usuários mais ativos**

Depois de contar as ações, podemos ordenar os usuários pelo número de ações em ordem decrescente e selecionar os 10 mais ativos:

Atribuições:

1) *orderBy(df_user_actions["count"].desc())*: Ordena os usuários pelo número de ações em ordem decrescente.

2) *limit(10)*: Seleciona apenas os 10 primeiros usuários da lista.

In [None]:
# Encontrar os 10 usuários mais ativos
df_top_users = df_user_actions.orderBy(df_user_actions["count"].desc()).limit(10)

# Mostrar os usuários mais ativos
df_top_users.show()


Salve o resultado em um arquivo CSV

Finalmente, salvamos o resultado dos 10 usuários mais ativos em um arquivo CSV:

In [None]:
# Salvar o resultado em um arquivo CSV
df_top_users.write.csv("/content/top_users.csv")


**Referências**

1) Zaharia, M., Chowdhury, M., Franklin, M. J., Shenker, S., & Stoica, I. (2010). "Spark: Cluster computing with working sets." In Proceedings of the 2nd USENIX conference on Hot topics in cloud computing (Vol. 10, No. 10-10, p. 95).


2) Armbrust, M., Das, T., & Xin, R. S. et al. (2015). "Spark SQL: Relational Data Processing in Spark." In Proceedings of the 2015 ACM SIGMOD International Conference on Management of Data (pp. 1383-1394).

3) Karau, H., Warren, R., Wendell, P., & Zaharia, M. (2017). Learning Spark: Lightning-fast big data analysis. O'Reilly Media.


4) White, T. (2012). Hadoop: The Definitive Guide. O'Reilly Media.


5) Meng, X., Bradley, J., Yavuz, B., Sparks, E., Venkataraman, S., Liu, D., ... & Zaharia, M. (2016). "MLlib: Machine learning in Apache Spark." Journal of Machine Learning Research, 17(1), 1235-1241.


6) Guller, M. (2015). Big Data Analytics with Spark: A Practitioner’s Guide to Using Spark for Large Scale Data Analysis. Apress.
