# Análise de Dados com PySpark e MySQL

Este notebook demonstra como usar PySpark para ler um conjunto de arquivos de texto de mais de 500MB e inserir os dados linha a linha em uma tabela de banco de dados MySQL.

## Inicialização do Spark

Vamos iniciar a sessão Spark e configurar a conexão com o MySQL.

In [1]:
pip install mysql-connector-python

Collecting mysql-connector-python
  Downloading mysql_connector_python-8.4.0-cp39-cp39-manylinux_2_17_x86_64.whl (19.4 MB)
[K     |████████████████████████████████| 19.4 MB 3.0 MB/s eta 0:00:01
[?25hInstalling collected packages: mysql-connector-python
Successfully installed mysql-connector-python-8.4.0
Note: you may need to restart the kernel to use updated packages.


In [None]:
import findspark
import mysql.connector
from pyspark.sql.session import SparkSession
from pyspark import SparkContext, SparkConf

In [None]:
# Inicializando Spark
findspark.init("/usr/spark-3.5.1/")

spark = (
    SparkSession.builder.appName("sparksubmit_test_app")
    .config("spark.sql.warehouse.dir", "hdfs:///user/hive/warehouse")
    .config("spark.sql.catalogImplementation", "hive")
    .getOrCreate()
)

In [None]:
# criar um contexto de sessão do spark (cria um "programa")
sc = SparkContext.getOrCreate()

## Conectando ao MySQL

Vamos configurar a conexão com o banco de dados MySQL.

In [None]:
# Conexão com o MySQL
con = mysql.connector.connect(
    host="10.5.0.5",
    port=3306,
    user="hadoop",
    password="123456",
    database="db"
)

In [None]:
# Função para criar a tabela se não existir
def create_table_if_not_exists():
    cursor = con.cursor()
    table_creation_query = """
    CREATE TABLE IF NOT EXISTS text_data (
        id INT AUTO_INCREMENT PRIMARY KEY,
        texto TEXT
    )
    """
    cursor.execute(table_creation_query)
    cursor.close()

In [None]:
# Função para inserir os dados no MySQL
def insert_into_mysql(text):
    cursor = con.cursor()
    sql = "INSERT INTO text_data (texto) values (%s)"
    valores = (text,)
    cursor.execute(sql, valores)
    cursor.close()
    con.commit()

In [None]:
# Função para ler os textos do MySQL
def read_from_mysql():
    cursor = con.cursor()
    cursor.execute("SELECT texto FROM text_data")
    rows = cursor.fetchall()
    cursor.close()
    return rows

In [None]:
# Criar a tabela se não existir
create_table_if_not_exists()

## Leitura e Processamento dos Arquivos de Texto

Vamos ler os arquivos de texto maiores que 500MB e processar os dados.

In [None]:
# Caminho para os arquivos de texto
file_path = "hdfs://spark-master:9000/datasets/*.txt"

# Leitura dos arquivos de texto
text_files = spark.read.text(file_path)

# Inserindo cada linha do arquivo no MySQL
for row in text_files.collect():
    # Pegando o texto da linha
    text = row[0]
    # Inserir no MySQL
    insert_into_mysql(text)

## Contagem de Palavras

Vamos contar as palavras nos arquivos de texto.

In [None]:
# Lendo os textos do MySQL
texts = read_from_mysql()

# Convertendo os textos em RDD
texts_rdd = spark.sparkContext.parallelize([row[0] for row in texts])

# Processamento das palavras
words = texts_rdd.flatMap(lambda line: line.split(" "))
word_counts = words.map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b)

# Salvando os resultados no HDFS
output_path = "hdfs://spark-master:9000/datasets/word_count"
word_counts.saveAsTextFile(output_path)

# Imprimindo os resultados
print(f"Total de palavras: {word_counts.count()}")

## Finalizando a Sessão Spark

Vamos parar a sessão Spark.

In [None]:
spark.stop()
con.close()