## Configuração 
---
          
- Opção 1: Cluster simulado pelo ambiente Spark local para teste
  - Pode ser monitorado em http://localhost:4040/jobs/

- Opção 2: 
  - Cluster com dois Worker Nodes instalados na minha máquina RioBello
  - Essa URL localiza o MasterNode que reconhe os Workers
  - Esse cluster pode ser monitorado aqui: http://riobello:8080/
     
  

## Arquitetura
---

<img src="https://drive.google.com/uc?id=1L0YRx_JyflW6GiLXsVoY5q7xpy7ZGlob" width=500/> 
  

In [1]:
import warnings
warnings.filterwarnings('ignore')

from pyspark import SparkConf, SparkContext


# Desconecta o SparkContext, caso esteja conectado
# sc.stop()

# Opção 1
conf = SparkConf().setMaster("local").setAppName("UFAM-Local")

# Opção 2 
#conf = SparkConf().setMaster("spark://RioBello:7077").setAppName("UFAM-RioBello")

# Conecta ao Cluster Spark
sc = SparkContext(conf = conf)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/11/23 10:16:32 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


## Exemplo: RDD criado de um arquivo de texto
--- 
- San Francisco Crime Classification Dataset
    - 878.050 registros de incidentes criminais em São Francisco de 01/03 a 05/15 
- Computar o número de registros em cada categoria 


## Exemplo: RDD criado de um arquivo de texto - Estrutura do Arquivo
--- 
 
<table>
    <tr>
   <td>  </td> <td>  </td>
  </tr>
  <tr>
 <td>
    <center> <img src="https://drive.google.com/uc?id=1LB4X6KZoUxKzYihoHPgSrN-_0B4WH4G5" width=700/> </center>
</td>
   <td>
       
- Dates: timestamp do incidente do crime
- Category: categoria do incidente do crime. 
- São 39 categorias distintas
- Descript: descrição detalhada do incidente do crime 
- DayOfWeek: o dia da semana
- PdDistric: nome do Distrito do Departamento de Polícia
- Resolution: como o incidente do crime foi resolvido
- Address: o endereço aproximado do incidente do crime

   <td>
</tr>
</table>   


In [2]:
# Leitura do arquivo contendo os dados e criação do rdd basee
rbase = sc.textFile("/Users/alti/Downloads/sf-crime.csv")

In [3]:
# Quebra das linhas em virgulas para obter os campos do arquivo
r1 = rbase.map(lambda line:line.split(","))

In [4]:
# Filtra as linha com vazias
r2 = r1.filter(lambda line: line[0]!="Dates")

In [5]:
# Extrair somente as categorias
r3 = r2.map(lambda line: (line[2],1))

In [6]:
# Agregação e soma pela categoria
r4 = r3.reduceByKey(lambda a,b:a+b)

In [7]:
# Ordenação pela ordem da categoria
r5 = r4.sortByKey()

                                                                                

In [8]:
# Envio do resultado de volta para o Driver
resultado = r5.collect()
print(resultado[0:10])

[('"ACCESS CARD INFORMATION', 319), ('"AEROSOL CONTAINER; SALE', 24), ('"AGGRAVATED ASSAULT OF POLICE OFFICER', 171), ('"AIDED CASE', 24148), ('"ALCOHOLIC BEVERAGE', 1), ('"AMMUNITION', 128), ('"ANIMAL', 15), ('"ARMOR PENETRATING AMMUNITION', 2), ('"ASSAULT', 228), ('"ATTEMPTED KIDNAPPING', 93)]


In [9]:
# Pipeline Completo
resultado = sc.textFile("/Users/alti/Downloads/sf-crime.csv")\
              .map(lambda line:line.split(","))\
              .filter(lambda line: line[0]!="Dates")\
              .map(lambda line: (line[2],1))\
              .reduceByKey(lambda a,b:a+b)\
              .sortByKey()\
              .collect()

print(resultado[0:10])

                                                                                

[('"ACCESS CARD INFORMATION', 319), ('"AEROSOL CONTAINER; SALE', 24), ('"AGGRAVATED ASSAULT OF POLICE OFFICER', 171), ('"AIDED CASE', 24148), ('"ALCOHOLIC BEVERAGE', 1), ('"AMMUNITION', 128), ('"ANIMAL', 15), ('"ARMOR PENETRATING AMMUNITION', 2), ('"ASSAULT', 228), ('"ATTEMPTED KIDNAPPING', 93)]


## Resultado de Cada Operação
---

In [10]:
# Leitura do arquivo contendo os dados e criação do rdd basee
rbase = sc.textFile("/Users/alti/Downloads/sf-crime.csv")
rbase.cache().collect()[0:5]

                                                                                

['Dates,Category,Descript,DayOfWeek,PdDistrict,Resolution,Address,X,Y',
 '2015-05-13 23:53:00,WARRANTS,WARRANT ARREST,Wednesday,NORTHERN,"ARREST, BOOKED",OAK ST / LAGUNA ST,-122.425891675136,37.7745985956747',
 '2015-05-13 23:53:00,OTHER OFFENSES,TRAFFIC VIOLATION ARREST,Wednesday,NORTHERN,"ARREST, BOOKED",OAK ST / LAGUNA ST,-122.425891675136,37.7745985956747',
 '2015-05-13 23:33:00,OTHER OFFENSES,TRAFFIC VIOLATION ARREST,Wednesday,NORTHERN,"ARREST, BOOKED",VANNESS AV / GREENWICH ST,-122.42436302145,37.8004143219856',
 '2015-05-13 23:30:00,LARCENY/THEFT,GRAND THEFT FROM LOCKED AUTO,Wednesday,NORTHERN,NONE,1500 Block of LOMBARD ST,-122.42699532676599,37.80087263276921']

In [11]:
# Quebra das linhas em virgulas para obter os campos do arquivo
r1 = rbase.map(lambda line:line.split(","))
r1.cache().collect()[0:5]

                                                                                

[['Dates',
  'Category',
  'Descript',
  'DayOfWeek',
  'PdDistrict',
  'Resolution',
  'Address',
  'X',
  'Y'],
 ['2015-05-13 23:53:00',
  'WARRANTS',
  'WARRANT ARREST',
  'Wednesday',
  'NORTHERN',
  '"ARREST',
  ' BOOKED"',
  'OAK ST / LAGUNA ST',
  '-122.425891675136',
  '37.7745985956747'],
 ['2015-05-13 23:53:00',
  'OTHER OFFENSES',
  'TRAFFIC VIOLATION ARREST',
  'Wednesday',
  'NORTHERN',
  '"ARREST',
  ' BOOKED"',
  'OAK ST / LAGUNA ST',
  '-122.425891675136',
  '37.7745985956747'],
 ['2015-05-13 23:33:00',
  'OTHER OFFENSES',
  'TRAFFIC VIOLATION ARREST',
  'Wednesday',
  'NORTHERN',
  '"ARREST',
  ' BOOKED"',
  'VANNESS AV / GREENWICH ST',
  '-122.42436302145',
  '37.8004143219856'],
 ['2015-05-13 23:30:00',
  'LARCENY/THEFT',
  'GRAND THEFT FROM LOCKED AUTO',
  'Wednesday',
  'NORTHERN',
  'NONE',
  '1500 Block of LOMBARD ST',
  '-122.42699532676599',
  '37.80087263276921']]

In [12]:
# Filtra as linha com vazias
r2 = r1.filter(lambda line: line[0]!="Dates")
r2.cache().collect()[0:5]

                                                                                

[['2015-05-13 23:53:00',
  'WARRANTS',
  'WARRANT ARREST',
  'Wednesday',
  'NORTHERN',
  '"ARREST',
  ' BOOKED"',
  'OAK ST / LAGUNA ST',
  '-122.425891675136',
  '37.7745985956747'],
 ['2015-05-13 23:53:00',
  'OTHER OFFENSES',
  'TRAFFIC VIOLATION ARREST',
  'Wednesday',
  'NORTHERN',
  '"ARREST',
  ' BOOKED"',
  'OAK ST / LAGUNA ST',
  '-122.425891675136',
  '37.7745985956747'],
 ['2015-05-13 23:33:00',
  'OTHER OFFENSES',
  'TRAFFIC VIOLATION ARREST',
  'Wednesday',
  'NORTHERN',
  '"ARREST',
  ' BOOKED"',
  'VANNESS AV / GREENWICH ST',
  '-122.42436302145',
  '37.8004143219856'],
 ['2015-05-13 23:30:00',
  'LARCENY/THEFT',
  'GRAND THEFT FROM LOCKED AUTO',
  'Wednesday',
  'NORTHERN',
  'NONE',
  '1500 Block of LOMBARD ST',
  '-122.42699532676599',
  '37.80087263276921'],
 ['2015-05-13 23:30:00',
  'LARCENY/THEFT',
  'GRAND THEFT FROM LOCKED AUTO',
  'Wednesday',
  'PARK',
  'NONE',
  '100 Block of BRODERICK ST',
  '-122.438737622757',
  '37.771541172057795']]

In [13]:
# Extrair somente as categorias
r3 = r2.map(lambda line: (line[2],1))
r3.cache().collect()[0:5]

                                                                                

[('WARRANT ARREST', 1),
 ('TRAFFIC VIOLATION ARREST', 1),
 ('TRAFFIC VIOLATION ARREST', 1),
 ('GRAND THEFT FROM LOCKED AUTO', 1),
 ('GRAND THEFT FROM LOCKED AUTO', 1)]

In [14]:
# Agregação e soma pela categoria
r4 = r3.reduceByKey(lambda a,b:a+b)
r4.cache().collect()[0:5]

[('GRAND THEFT FROM UNLOCKED AUTO', 4096),
 ('"MALICIOUS MISCHIEF', 43753),
 ('"ROBBERY', 6460),
 ('SUSPICIOUS PACKAGE', 25),
 ('"AIDED CASE', 24148)]

In [102]:
# Ordenação pela ordem da categoria
r5 = r4.sortByKey()
r5.cache().collect()[0:5]



[('YOUTH COURT', 15),
 ('WILLFUL CRUELTY TO CHILD', 114),
 ('WEARING THE APPAREL OF OPPOSITE SEX TO DECEIVE', 1),
 ('WEARING MASK OR DISGUISE FOR UNLAWFUL PURPOSE', 4),
 ('WEAPONS POSSESSION BY JUVENILE SUSPECT', 13)]

In [103]:
r5.top(5)

[('YOUTH COURT', 15),
 ('WILLFUL CRUELTY TO CHILD', 114),
 ('WEARING THE APPAREL OF OPPOSITE SEX TO DECEIVE', 1),
 ('WEARING MASK OR DISGUISE FOR UNLAWFUL PURPOSE', 4),
 ('WEAPONS POSSESSION BY JUVENILE SUSPECT', 13)]

In [104]:
r5.take(5)

[('"ACCESS CARD INFORMATION', 319),
 ('"AEROSOL CONTAINER; SALE', 24),
 ('"AGGRAVATED ASSAULT OF POLICE OFFICER', 171),
 ('"AIDED CASE', 24148),
 ('"ALCOHOLIC BEVERAGE', 1)]

In [16]:
# Envio do resultado de volta para o Driver
resultado = r5.collect()
print(resultado[0:10])

[('"ACCESS CARD INFORMATION', 319), ('"AEROSOL CONTAINER; SALE', 24), ('"AGGRAVATED ASSAULT OF POLICE OFFICER', 171), ('"AIDED CASE', 24148), ('"ALCOHOLIC BEVERAGE', 1), ('"AMMUNITION', 128), ('"ANIMAL', 15), ('"ARMOR PENETRATING AMMUNITION', 2), ('"ASSAULT', 228), ('"ATTEMPTED KIDNAPPING', 93)]


## Exemplo: RDD criado em memória
--- 

In [88]:
texto = ("Project Gutenberg’s",
    "Alice’s Adventures in Wonderland",
    "Project Gutenberg’s",
    "Adventures in Wonderland",
    "Project Gutenberg’s")

In [89]:
rtexto=sc.parallelize(texto)
rtexto.collect()

['Project Gutenberg’s',
 'Alice’s Adventures in Wonderland',
 'Project Gutenberg’s',
 'Adventures in Wonderland',
 'Project Gutenberg’s']

In [90]:
texto_quebra1 = rtexto.map(lambda x : x.split())
texto_quebra1.collect()

[['Project', 'Gutenberg’s'],
 ['Alice’s', 'Adventures', 'in', 'Wonderland'],
 ['Project', 'Gutenberg’s'],
 ['Adventures', 'in', 'Wonderland'],
 ['Project', 'Gutenberg’s']]

In [91]:
texto_quebra2 = rtexto.flatMap(lambda x : x.split())
texto_quebra2.collect()

['Project',
 'Gutenberg’s',
 'Alice’s',
 'Adventures',
 'in',
 'Wonderland',
 'Project',
 'Gutenberg’s',
 'Adventures',
 'in',
 'Wonderland',
 'Project',
 'Gutenberg’s']

In [92]:
rpalavras = texto_quebra2.map(lambda x:(x,1))
rpalavras.collect()

[('Project', 1),
 ('Gutenberg’s', 1),
 ('Alice’s', 1),
 ('Adventures', 1),
 ('in', 1),
 ('Wonderland', 1),
 ('Project', 1),
 ('Gutenberg’s', 1),
 ('Adventures', 1),
 ('in', 1),
 ('Wonderland', 1),
 ('Project', 1),
 ('Gutenberg’s', 1)]

In [59]:
import warnings
warnings.filterwarnings('ignore')

rcontador=rpalavras.reduceByKey(lambda a,b: a+b)
rcontador.collect()                             

[('Project', 3),
 ('Gutenberg’s', 3),
 ('Alice’s', 1),
 ('Adventures', 2),
 ('in', 2),
 ('Wonderland', 2)]

In [79]:
texto_quebra1.collect()

[['Project', 'Gutenberg’s'],
 ['Alice’s', 'Adventures', 'in', 'Wonderland'],
 ['Project', 'Gutenberg’s'],
 ['Adventures', 'in', 'Wonderland'],
 ['Project', 'Gutenberg’s']]

In [101]:
texto_quebra1.map(lambda x:(x[0],x[1:])).groupByKey().mapValues(list).collect()

[('Project', [['Gutenberg’s'], ['Gutenberg’s'], ['Gutenberg’s']]),
 ('Alice’s', [['Adventures', 'in', 'Wonderland']]),
 ('Adventures', [['in', 'Wonderland']])]