## PySpark - Instalando a biblioteca PySpark

- `pip install pyspark` para instalar a biblioteca mais recente do PySpark no projeto.
- `pip install findspark` para instalar a biblioteca mais recente do FindSpak

In [1]:
import findspark # importa a biblioteca para o notebook
findspark.init() #inicializa a biblioteca
from pyspark.sql import SparkSession # essa função inicializa a sessão de uso do PySpark dentro do notebook

spark = SparkSession.builder.master('local[*]').getOrCreate()

In [2]:
# importa os dados para o projeto
df = spark.sql('''select 'Sucesso total, estamos online!' as hello''')
df.show()

+--------------------+
|               hello|
+--------------------+
|Sucesso total, es...|
+--------------------+



In [3]:
# instalação  das principais bibliotecas do PySpark
# referência para funções específicas para tratamento de dados
from pyspark.sql import Row, DataFrame
from pyspark.sql.types import StringType, StructType, StructField, IntegerType # 
from pyspark.sql.functions import col, expr, lit, substring, concat, concat_ws, when, coalesce
from pyspark.sql import functions as F  # for more sql functions
from functools import reduce
import pandas as pd

# Data Manipulation using Spark

In [4]:
# importa base de dados banklist
# inferSchema - força a inferência do schema quando o arquivo for importado
# header - informa o spark que há um cabeçalho no arquivo que está sendo importado

url_github = 'https://raw.githubusercontent.com/camimq/big_data/main/bases/banklist.csv'

pd_df = pd.read_csv(url_github)
df_banklist = spark.createDataFrame(pd_df)
# df_banklist = spark.read.csv(r'file_path_here', sep=',', inferSchema=True, header=True)

print('df.count: ', df_banklist.count())
print('df.col ct: ', len(df_banklist.columns))
print('df.columns: ', df_banklist.columns)

  if should_localize and is_datetime64tz_dtype(s.dtype) and s.dt.tz is not None:


df.count:  551
df.col ct:  7
df.columns:  ['Bank Name', 'City', 'ST', 'CERT', 'Acquiring Institution', 'Closing Date', 'Updated Date']


### Workaround para leitura de arquivo

Para que eu não exponha o caminho de arquivo no meu computador, faço o _upload_ das bases no GitHub. Contudo, com o PySpark, por alguma razão, não consigo puxar o arquivo como fiz até o momento. Pesquisando na internet, descobri o _workaround_ acima no [StackOverflow](https://stackoverflow.com/questions/71251538/use-csv-from-github-in-pyspark) onde, basicamente, declaro uma variável com a URL onde o arquivo está e, posteriormente, crio outra variável de leitura do `csv` no pandas que, por último é utilizada para função `create` do Spark.

# Using SQL in PySpark

In [10]:

df_banklist.createOrReplaceTempView("banklist")

df_check = spark.sql('''select `Bank Name`, City, `Closing Date` from banklist''') # cria o dataframe utilizando um query SQL
df_check.show()
# df_check.show(4, truncate=False)

+--------------------+------------------+------------+
|           Bank Name|              City|Closing Date|
+--------------------+------------------+------------+
| Fayette County Bank|        Saint Elmo|   26-May-17|
|Guaranty Bank, (d...|         Milwaukee|    5-May-17|
|      First NBC Bank|       New Orleans|   28-Apr-17|
|       Proficio Bank|Cottonwood Heights|    3-Mar-17|
|Seaway Bank and T...|           Chicago|   27-Jan-17|
|Harvest Community...|        Pennsville|   13-Jan-17|
|         Allied Bank|          Mulberry|   23-Sep-16|
|The Woodbury Bank...|          Woodbury|   19-Aug-16|
|First CornerStone...|   King of Prussia|    6-May-16|
|  Trust Company Bank|           Memphis|   29-Apr-16|
|North Milwaukee S...|         Milwaukee|   11-Mar-16|
|Hometown National...|          Longview|    2-Oct-15|
| The Bank of Georgia|    Peachtree City|    2-Oct-15|
|        Premier Bank|            Denver|   10-Jul-15|
|      Edgebrook Bank|           Chicago|    8-May-15|
|         

# DataFrame Basic Operations

In [11]:
df_banklist.describe().show()

+-------+--------------------+-------+----+------------------+---------------------+------------+------------+
|summary|           Bank Name|   City|  ST|              CERT|Acquiring Institution|Closing Date|Updated Date|
+-------+--------------------+-------+----+------------------+---------------------+------------+------------+
|  count|                 551|    551| 551|               551|                  551|         551|         551|
|   mean|                NULL|   NULL|NULL|31729.392014519057|                 NULL|        NULL|        NULL|
| stddev|                NULL|   NULL|NULL|16449.761310748272|                 NULL|        NULL|        NULL|
|    min|1st American Stat...|Acworth|  AL|                91|      1st United Bank|    1-Aug-08|    1-Aug-13|
|    max|               ebank|Wyoming|  WY|             58701|  Your Community Bank|    9-Sep-11|    9-Sep-12|
+-------+--------------------+-------+----+------------------+---------------------+------------+------------+



In [14]:
# mostra a coluna Cidade e Estado
df_banklist.describe('City', 'ST').show()

+-------+-------+----+
|summary|   City|  ST|
+-------+-------+----+
|  count|    551| 551|
|   mean|   NULL|NULL|
| stddev|   NULL|NULL|
|    min|Acworth|  AL|
|    max|Wyoming|  WY|
+-------+-------+----+



# Count, Columns and Schema

In [20]:
print('Total de linhas', df_banklist.count())
print('Total de colunas:', len(df_banklist.columns))
print('Colunas:', df_banklist.columns)
print('Tipo de dados:', df_banklist.dtypes)
print('Schema:', df_banklist.schema)

Total de linhas 551
Total de colunas: 7
Colunas: ['Bank Name', 'City', 'ST', 'CERT', 'Acquiring Institution', 'Closing Date', 'Updated Date']
Tipo de dados: [('Bank Name', 'string'), ('City', 'string'), ('ST', 'string'), ('CERT', 'bigint'), ('Acquiring Institution', 'string'), ('Closing Date', 'string'), ('Updated Date', 'string')]
Schema: StructType([StructField('Bank Name', StringType(), True), StructField('City', StringType(), True), StructField('ST', StringType(), True), StructField('CERT', LongType(), True), StructField('Acquiring Institution', StringType(), True), StructField('Closing Date', StringType(), True), StructField('Updated Date', StringType(), True)])


In [22]:
df_banklist.printSchema()

root
 |-- Bank Name: string (nullable = true)
 |-- City: string (nullable = true)
 |-- ST: string (nullable = true)
 |-- CERT: long (nullable = true)
 |-- Acquiring Institution: string (nullable = true)
 |-- Closing Date: string (nullable = true)
 |-- Updated Date: string (nullable = true)



# Remove Duplicates

In [24]:
df_banklist = df_banklist.dropDuplicates()
print('df.count: ', df_banklist.count())
print('df.columns: ', df_banklist.columns)

df.count:  551
df.columns:  ['Bank Name', 'City', 'ST', 'CERT', 'Acquiring Institution', 'Closing Date', 'Updated Date']


Como não há mudança na contagem de linhas, é possível confirmar que não há dado duplicado.

# Select Specific columns

In [25]:
df2 = df_banklist.select(*['Bank Name', 'City'])
df2.show(2)

+--------------------+--------+
|           Bank Name|    City|
+--------------------+--------+
|         Allied Bank|Mulberry|
|Highland Communit...| Chicago|
+--------------------+--------+
only showing top 2 rows



# Select Multiple Columns

In [26]:
# seleciona todas as colunas, exceto CERT e ST
col_1 = list(set(df_banklist.columns) - {'CERT', 'ST'})
df2 = df_banklist.select(*col_1)
df2.show(2) 

+---------------------+------------+------------+--------------------+--------+
|Acquiring Institution|Closing Date|Updated Date|           Bank Name|    City|
+---------------------+------------+------------+--------------------+--------+
|         Today's Bank|   23-Sep-16|   17-Nov-16|         Allied Bank|Mulberry|
| United Fidelity B...|   23-Jan-15|   21-Apr-15|Highland Communit...| Chicago|
+---------------------+------------+------------+--------------------+--------+
only showing top 2 rows



# Rename Columns

In [28]:
# renomeando as colunas dentro do dataset
df2 = df_banklist \
    .withColumnRenamed('Bank Name', 'bank_name')\
    .withColumnRenamed('Acquiring Institution', 'acq_institution')\
    .withColumnRenamed('Closing Date', 'closing_date')\
    .withColumnRenamed('ST', 'state')\
    .withColumnRenamed('CERT', 'cert')\
    .withColumnRenamed('City', 'city')\
    .withColumnRenamed('Updated Date', 'updated_date')#\
df2.show()

+--------------------+---------------+-----+-----+--------------------+------------+------------+
|           bank_name|           city|state| cert|     acq_institution|closing_date|updated_date|
+--------------------+---------------+-----+-----+--------------------+------------+------------+
|         Allied Bank|       Mulberry|   AR|   91|        Today's Bank|   23-Sep-16|   17-Nov-16|
|Highland Communit...|        Chicago|   IL|20290|United Fidelity B...|   23-Jan-15|   21-Apr-15|
|         Valley Bank|Fort Lauderdale|   FL|21793|Landmark Bank, Na...|   20-Jun-14|   29-Jun-15|
|Heritage Bank of ...|    Orange Park|   FL|26680|  FirstAtlantic Bank|   19-Apr-13|    8-Aug-16|
| Fayette County Bank|     Saint Elmo|   IL| 1802|United Fidelity B...|   26-May-17|    1-Jun-17|
|       Covenant Bank|        Chicago|   IL|22476|Liberty Bank and ...|   15-Feb-13|   21-Sep-15|
|  First Federal Bank|      Lexington|   KY|29594| Your Community Bank|   19-Apr-13|   12-Dec-16|
|Hometown National..