# Análise de Dados com PySpark e MySQL

Este notebook demonstra como usar PySpark para ler um conjunto de arquivos de texto de mais de 500MB e inserir os dados linha a linha em uma tabela de banco de dados MySQL.

## Inicialização do Spark

Inicializando a sessão Spark e configurando a conexão com o MySQL.

In [1]:
pip install mysql-connector-python

Collecting mysql-connector-python
  Downloading mysql_connector_python-8.4.0-cp39-cp39-manylinux_2_17_x86_64.whl (19.4 MB)
[K     |████████████████████████████████| 19.4 MB 389 kB/s eta 0:00:01    |████▌                           | 2.7 MB 1.4 MB/s eta 0:00:12     |█████████████▉                  | 8.4 MB 1.5 MB/s eta 0:00:08     |█████████████████▉              | 10.9 MB 1.5 MB/s eta 0:00:06     |██████████████████████████████▌ | 18.5 MB 389 kB/s eta 0:00:03
[?25hInstalling collected packages: mysql-connector-python
Successfully installed mysql-connector-python-8.4.0
Note: you may need to restart the kernel to use updated packages.


In [2]:
import findspark
import mysql.connector
from pyspark.sql.session import SparkSession
from pyspark import SparkContext, SparkConf

In [3]:
# Inicializando Spark
findspark.init("/usr/spark-3.5.1/")

spark = (
    SparkSession.builder.appName("sparksubmit_test_app")
    .config("spark.sql.warehouse.dir", "hdfs:///user/hive/warehouse")
    .config("spark.sql.catalogImplementation", "hive")
    .getOrCreate()
)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/06/11 17:38:20 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/06/11 17:38:28 WARN Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.


In [4]:
# criar um contexto de sessão do spark (cria um "programa")
sc = SparkContext.getOrCreate()

## Conectando/Configurando o MySQL

In [5]:
# Conexão com o MySQL
con = mysql.connector.connect(
    host="10.5.0.5",
    port=3306,
    user="user_hadoop",
    password="123456",
    database="db"
)

In [6]:
# Função para criar a tabela se não existir
def create_table_if_not_exists():
    cursor = con.cursor()
    table_creation_query = """
    CREATE TABLE IF NOT EXISTS text_data (
        id INT AUTO_INCREMENT PRIMARY KEY,
        texto TEXT
    )
    """
    cursor.execute(table_creation_query)
    cursor.close()

In [7]:
# Função para inserir os dados no MySQL
def insert_into_mysql(text):
    cursor = con.cursor()
    sql = "INSERT INTO text_data (texto) values (%s)"
    valores = (text,)
    cursor.execute(sql, valores)
    cursor.close()
    con.commit()

In [8]:
# Função para ler os textos do MySQL
def read_from_mysql():
    cursor = con.cursor()
    cursor.execute("SELECT texto FROM text_data")
    rows = cursor.fetchall()
    cursor.close()
    return rows

In [9]:
# Criar a tabela se não existir
create_table_if_not_exists()

## Leitura e Processamento dos Arquivos de Texto

Vamos ler os arquivos de texto maiores que 500MB e processar os dados.

In [12]:
# Caminho para os arquivos de texto
file_path = "hdfs://spark-master:9000/datasets/*.txt"
# Leitura dos arquivos de texto
text_files = spark.read.text(file_path)
# Inserindo cada linha do arquivo no MySQL
for row in text_files.collect():
    # Pegando o texto da linha
    text = row[0]
    # Inserir no MySQL
    insert_into_mysql(text)

# Leitura dos arquivos de texto
print("Terminei")

                                                                                

Terminei


## Contagem de Palavras

In [10]:
# Lendo os textos do MySQL
texts = read_from_mysql()

# Convertendo os textos em RDD
texts_rdd = spark.sparkContext.parallelize([row[0] for row in texts])

# Processamento das palavras
words = texts_rdd.flatMap(lambda line: line.split(" "))
word_counts = words.map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b)

# Salvando os resultados no HDFS
output_path = "hdfs://spark-master:9000/datasets/word_count"
word_counts.saveAsTextFile(output_path)

# Imprimindo os resultados
print(f"Total de palavras: {word_counts.count()}")

24/06/11 17:40:29 WARN TaskSetManager: Stage 0 contains a task of very large size (30416 KiB). The maximum recommended task size is 1000 KiB.

Total de palavras: 348459


                                                                                

## Finalizando a Sessão Spark

In [11]:
spark.stop()
con.close()

In [None]:
FIM :)