## *Aula 1: Trabalhe com RDDs*
- Aqui aprendi a usar as funções: **SparkContext, getOrCreate, parallelize, lambda, getNumPartitions, first, take, collect, map, upper, lower e stop**

In [None]:
!pip install pyspark # Sempre realizar o processo de instalação do pyspark no inicio



In [None]:
import pyspark
from pyspark import SparkContext # Aqui é o Core do Spark

In [None]:
sc = SparkContext.getOrCreate() # Utilizando o Core do Spark

In [None]:
rdd = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]) # Paralelizando uma lista com 10 numeros

In [None]:
rdd.getNumPartitions() # Ver em quantos pedaços o arquivo está dividido
# O Spark divide o arquivo de forma automática, nesse caso a lista foi dividida em 2 partes


2

In [None]:
rdd.first() # Função para ver a primeira linha

1

In [None]:
rdd.take(4) # Função para buscar um valor espeçífico, neste caso os 4 primeiros

[1, 2, 3, 4]

In [None]:
rdd.collect() # Cuidado ao usar essa função pois ela pega todo o bigdata que voce está analisando e traz tudo

[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

In [None]:
rddtexto = sc.parallelize(["Big Data", "Machine Learning", "Data Science"])

In [None]:
# Existem duas funções interessantes: map e flatmap. map lê os valores separados por virgula e flatmap lê todos os caracteres

In [None]:
rddminuscula = rddtexto.map(lambda x: x.lower()) # Sempre use o Lambda, muito melhor que criar funções
rddminuscula.take(3)

['big data', 'machine learning', 'data science']

In [None]:
sc.stop()

## *Aula 2: Transformações e ações em RDDs*
- Aqui aprendi a usar as funções: **SparkConf, textFile, count, flatmap, split, filter, startsWith, reduceByKey e saveAsTextFile**

In [None]:
from pyspark import SparkConf # Função mais utilizada pelos engenheiros de dados

In [None]:
conf = SparkConf().setAppName("Leitura de Aquivo Texto") # Uma das opções dessa função é dar nome para a aplicação
sc = SparkContext(conf=conf).getOrCreate() # Foi adicionado a configuração de nome do aplicativo no SparkContext

In [None]:
rdd = sc.textFile("sample_data/README.md") # Peguei um arquivo de texto salvo coma função textFile

In [None]:
rdd.count() # Contando a quantidade de linhas

19

In [None]:
rdd.take(7)

['This directory includes a few sample datasets to get you started.',
 '',
 '*   `california_housing_data*.csv` is California housing data from the 1990 US',
 '    Census; more information is available at:',
 '    https://docs.google.com/document/d/e/2PACX-1vRhYtsvc5eOR2FWNCwaBiKL6suIOrxJig8LcSBbmCbyYsayia_DvPOOBlXZ4CAlQ5nlDD8kTaIDRwrN/pub',
 '',
 '*   `mnist_*.csv` is a small sample of the']

In [None]:
palavra = rdd.flatMap(lambda x:x.split(" ")) #Queremos contar palavras e precisamos separar palavra por palavra
# Utilizei o flatmap para lêr letra por letra até encontrar um espaço (cuja função é separar palavras)
# Utilizando a função split é possivel agora separar essas palavras sempre que o flatmap encontrar um espaço

In [None]:
palavraminuscula = palavra.map(lambda x: x.lower())
print("Map: ", palavraminuscula.take(5) )

Map:  ['this', 'directory', 'includes', 'a', 'few']


In [None]:
letrasmaiuscula = palavra.flatMap(lambda x: x.upper())
print("FlatMap: ", letrasmaiuscula.take(5))

FlatMap:  ['T', 'H', 'I', 'S', 'D']


In [None]:
palavracomeçaT = palavraminuscula.filter(lambda x: x.startswith("t"))
print("Filter: ", palavracomeçaT.take(5))

Filter:  ['this', 'to', 'the', 'the', 'the']


In [None]:
palavramin2caracteres = palavraminuscula.filter(lambda x: len(x) >= 2)
print("Filter: ", palavramin2caracteres.take(5))

Filter:  ['this', 'directory', 'includes', 'few', 'sample']


In [None]:
palavrachavevalor = palavraminuscula.map(lambda x: (x, 1)) # Precisa-se da chave e do valor para futura contagem
palavrachavevalor.take(5)

[('this', 1), ('directory', 1), ('includes', 1), ('a', 1), ('few', 1)]

In [None]:
palavracontar = palavrachavevalor.reduceByKey(lambda x,y: x+y) # Função para contar palavras
palavracontar.take(50)

[('this', 1),
 ('sample', 2),
 ('datasets', 1),
 ('to', 1),
 ('you', 1),
 ('started.', 1),
 ('', 51),
 ('*', 3),
 ('`california_housing_data*.csv`', 1),
 ('from', 1),
 ('1990', 1),
 ('us', 1),
 ('more', 1),
 ('information', 1),
 ('available', 1),
 ('small', 1),
 ('of', 2),
 ('database](https://en.wikipedia.org/wiki/mnist_database),', 1),
 ('described', 2),
 ("[anscombe's", 1),
 ('it', 1),
 ('j.', 1),
 ('statistical', 1),
 ("analysis'.", 1),
 ('american', 1),
 ('jstor', 1),
 ('2682899.', 1),
 ('and', 1),
 ('our', 1),
 ('by', 1),
 ('directory', 1),
 ('includes', 1),
 ('a', 3),
 ('few', 1),
 ('get', 1),
 ('is', 4),
 ('california', 1),
 ('housing', 1),
 ('data', 1),
 ('the', 3),
 ('census;', 1),
 ('at:', 2),
 ('https://docs.google.com/document/d/e/2pacx-1vrhytsvc5eor2fwncwabikl6suiorxjig8lcsbbmcbyysayia_dvpooblxz4calq5nldd8ktaidrwrn/pub',
  1),
 ('`mnist_*.csv`', 1),
 ('[mnist', 1),
 ('which', 1),
 ('http://yann.lecun.com/exdb/mnist/', 1),
 ('`anscombe.json`', 1),
 ('contains', 1),
 ('copy

In [None]:
palavracontar.saveAsTextFile("Contar_Palavras") # Função para salvar o arquivo

In [None]:
!ls Contar_Palavras/

part-00000  part-00001	_SUCCESS


In [None]:
sc.stop()

## *Aula 3: Leitura de Arquivo*


- Aprendi a usar as funções: *SparkSession, builder, read, show, printSchema, option, header, inferSchema, mode 'DROPMALFORMED'*



In [None]:
from pyspark.sql import SparkSession # Melhor que o SparkContext
from pyspark.sql.types import *
from datetime import date

# A SparkSession engloba o SparkContext e fornece uma interface mais simples para trabalhar com DataFrames, Datasets e SQL, substituindo a necessidade de
# criar instâncias separadas de SQLContext ou HiveContext

In [None]:
spark = SparkSession.builder.getOrCreate()

In [None]:
df = spark.read.csv("clientes_tratados.csv") # Fantando especificar que tem cabeçalho

In [None]:
df.take(4) # Visualização de linha por linha

[Row(nome='Arthur Montenegro', cpf='385.***.***-80', idade='29.0', data='1995-08-31', endereco='vereda isabela almeida, 8', bairro='pantanal', estado='ES'),
 Row(nome='Sra. Ágatha Cavalcanti', cpf='952.***.***-85', idade='43.0', data='1982-06-25', endereco='estrada de da luz', bairro='providencia', estado='CE'),
 Row(nome='Calebe Da Cruz', cpf='746.***.***-09', idade='54.0', data='1971-10-17', endereco='ladeira gael marques, 25', bairro='carlos prates', estado='PA'),
 Row(nome='Maysa Costa', cpf='914.***.***-33', idade='39.0', data='1986-10-14', endereco='praça silva, 4', bairro='suzana', estado='PI')]

In [None]:
df.show(3) # A função show traz uma visualização em formato de tabela

+--------------------+--------------+-----+----------+--------------------+-------------+------+
|                nome|           cpf|idade|      data|            endereco|       bairro|estado|
+--------------------+--------------+-----+----------+--------------------+-------------+------+
|   Arthur Montenegro|385.***.***-80| 29.0|1995-08-31|vereda isabela al...|     pantanal|    ES|
|Sra. Ágatha Caval...|952.***.***-85| 43.0|1982-06-25|   estrada de da luz|  providencia|    CE|
|      Calebe Da Cruz|746.***.***-09| 54.0|1971-10-17|ladeira gael marq...|carlos prates|    PA|
+--------------------+--------------+-----+----------+--------------------+-------------+------+
only showing top 3 rows


In [None]:
df.printSchema()

root
 |-- nome: string (nullable = true)
 |-- cpf: string (nullable = true)
 |-- idade: double (nullable = true)
 |-- data: date (nullable = true)
 |-- endereco: string (nullable = true)
 |-- bairro: string (nullable = true)
 |-- estado: string (nullable = true)



In [None]:
df = spark.read.option("header","true").csv("clientes_tratados.csv") # Especificando que contém cabeçalho
# Após rodar esse código, inicie novamente os codigos de take, show e printschema acima

In [None]:
df = spark.read.option("header","true").option("inferSchema","true").csv("clientes_tratados.csv") # InferSchema serve para colocar umm formato automático nas colunas
# Observe que no printSchema acima a coluna idade e data está em formato string, depois de rodar este codigo reinicie o printSchemma

In [None]:
# Vamos aprender mais sobre alguns options que podemos adicionar junto ao código de leitura da base de dados

# função que define o separador dos valores
# função que define o cabeçalho
# função que remove automaticamente linhas corrompidas ou mal estruturadas
# função de define automaticamente o formato das colunas

df = spark.read.\
option("sep",",").\
option("header","true").\
option("mode","DROPMALFORMED").\
option("inferSchema","true").\
csv("clientes_tratados.csv")

## *Aula 4: Estrutura do Arquivo*
- Aprendi a usar as funções: *StructField, StringTrype, FloatType, DateType, schema e createDataFrame*

In [None]:
# Para o caso de o schema automático não funcionar corretamente, você pode criar seu próprio schema
listacampos = [
    StructField("nome", StringType()),
    StructField("cpf", StringType()),
    StructField("idade", FloatType()),
    StructField("data", DateType()),
    StructField("endereco", StringType()),
    StructField("bairro", StringType()),
    StructField("estado", StringType())
]
schema_definido = StructType(listacampos) # StructField define os detahes de uma única coluna. StructType define a estrutura total.

In [None]:
df = spark.read.option("header","true").schema(schema_definido).csv("clientes_tratados.csv")

In [None]:
df.printSchema()

root
 |-- nome: string (nullable = true)
 |-- cpf: string (nullable = true)
 |-- idade: float (nullable = true)
 |-- data: date (nullable = true)
 |-- endereco: float (nullable = true)
 |-- bairro: string (nullable = true)
 |-- estado: string (nullable = true)



In [None]:
# Para criar uma base de dados:
lista_teste = [
    Row(nome='Arthur Montenegro', cpf='385.***.***-80', idade=29.0, data=date(1995,8,1), endereco='vereda isabela almeida, 8', bairro='pantanal', estado='ES'),
    Row(nome='Sra. Ágatha Cavalcanti', cpf='952.***.***-85', idade=43.0, data=date(1982,6,25), endereco='estrada de da luz', bairro='providencia', estado='CE'),
    Row(nome='Calebe Da Cruz', cpf='746.***.***-09', idade=54.0, data=date(1971,10,17), endereco='ladeira gael marques, 25', bairro='carlos prates', estado='PA'),
    Row(nome='Maysa Costa', cpf='914.***.***-33', idade=39.0, data=date(1986,10,14), endereco='praça silva, 4', bairro='suzana', estado='PI')
] # É possivel especificar o tipo de dados, observer o campo data, coloquei data como sento do tipo date.

In [None]:
df_parcial = spark.createDataFrame(data=lista_teste, schema=schema_definido)

In [None]:
df_parcial.printSchema()

root
 |-- nome: string (nullable = true)
 |-- cpf: string (nullable = true)
 |-- idade: float (nullable = true)
 |-- data: date (nullable = true)
 |-- endereco: string (nullable = true)
 |-- bairro: string (nullable = true)
 |-- estado: string (nullable = true)



In [None]:
df_parcial.show()

+--------------------+--------------+-----+----------+--------------------+-------------+------+
|                nome|           cpf|idade|      data|            endereco|       bairro|estado|
+--------------------+--------------+-----+----------+--------------------+-------------+------+
|   Arthur Montenegro|385.***.***-80| 29.0|1995-08-01|vereda isabela al...|     pantanal|    ES|
|Sra. Ágatha Caval...|952.***.***-85| 43.0|1982-06-25|   estrada de da luz|  providencia|    CE|
|      Calebe Da Cruz|746.***.***-09| 54.0|1971-10-17|ladeira gael marq...|carlos prates|    PA|
|         Maysa Costa|914.***.***-33| 39.0|1986-10-14|      praça silva, 4|       suzana|    PI|
+--------------------+--------------+-----+----------+--------------------+-------------+------+



In [None]:
spark.stop()

## *Aula 5: Leitura e Escrita de Dados*
Aprendi a usar as funções: *write, csv, json, orc, parquet, saveAsTable, save, mode("overwrite"), createOrReplaceTempView, catalog, listDatabase, listTables*

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
df = spark.read.option("header","true").option("inferSchema","true").csv("sample_data/anscombe.json")

In [None]:
df.write.csv("anscombe.csv") # É necessário dizer que o dataset possui cabeçalho ou ele será excluído

In [None]:
df.write.mode("overwrite").option("header","true").csv("anscombe.csv") # a Função mode(overwrite) para substituir um arquivo por outro caso já exista

In [None]:
df = spark.read.option("header","true").option("inferSchema","true").csv("anscombe.csv") # Veja se o arquivo foi salvo corretamente
df.show(5)

+-------------+
|            [|
+-------------+
|{"Series":"I"|
|{"Series":"I"|
|{"Series":"I"|
|{"Series":"I"|
|{"Series":"I"|
+-------------+
only showing top 5 rows


In [None]:
# Agora voce pode testar salvar esse mesmo arquivo em diferentes formatos
df.write.option("header","true").json("anscombe.json") # Formato JSON
df.write.option("header","true").save("anscombe.parquet") # Formato automático Parquet
df.write.option("header","true").orc("anscombe.orc") # Formato ORC
df.write.option("header","true").saveAsTable("anscombe") # Formato para o seu WareHouse no MySQL

df.createOrReplaceTempView("temp_anscombe") # função para criar uma View no seu WareHouse

In [None]:
spark.catalog.listDatabases() # Ver os bancos de dados disponíveis

[Database(name='default', catalog='spark_catalog', description='default database', locationUri='file:/content/spark-warehouse')]

In [None]:
spark.catalog.setCurrentDatabase("default") # Escolher um banco de dados específico

In [None]:
spark.catalog.listTables() # Mostrar as tabelas existentes em uma base de dados específica

[Table(name='anscombe', catalog='spark_catalog', namespace=['default'], description=None, tableType='MANAGED', isTemporary=False),
 Table(name='temp_anscombe', catalog=None, namespace=[], description=None, tableType='TEMPORARY', isTemporary=True)]