# Criação de Contexto

Sc é uma conexão com o Spark Cluster e é criado no momento da execução do comando PySpark.
Estamos no Python, portanto, temos que nos conectar ao cluster Spark.

In [None]:
#Contexto criado automaticamente

In [3]:
print(sc)

<SparkContext master=local[*] appName=PySparkShell>


In [4]:
print(sc.version)

2.2.1


In [None]:
#Criando o contexto importando o SparkContext

from pyspark import SparkContext

In [None]:
sc = SparkContext.getOrCreate()

In [None]:
print(sc)

# Criando o contexto com opções

In [None]:
from pyspark import SparkContext
#sc = SparkContext(master="local", appName="Aplicacao")

# Contexto SQL

In [None]:
sc

Permite processamento de arquivos JSON e integra-se com o Hive. Permite também a consulta no estilo SQL utilizando JDBC

In [None]:
print(sqlContext)

# Leitura de Dados

In [5]:
#Criando RDDs através de dados

dados_nativos = sc.parallelize([('banana',5),('laranja',4),('manga',4),('morango',2)])

In [6]:
dados_nativos.collect()

[('banana', 5), ('laranja', 4), ('manga', 4), ('morango', 2)]

In [7]:
#Criando RDDs através de arquivos

path = 'C:\\Dados\\movies.txt'

dados_arquivo = sc.textFile(path)

In [None]:
#Leitura do HDFS Hadoop

dados_hdfs = "hdfs:///user/diretorio1/diretorio2/arquivo.csv"

# Verificando a Tipagem

In [8]:
type(dados_nativos)

pyspark.rdd.RDD

In [9]:
type(dados_arquivo)

pyspark.rdd.RDD

# RDDs?

In [None]:
pyspark --master local[2]

# Imprimindo os Dados

In [10]:
dados_nativos

ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:489

In [12]:
dados_arquivo

C:\Dados\movies.txt MapPartitionsRDD[2] at textFile at NativeMethodAccessorImpl.java:0

In [13]:
dados_nativos.collect()

[('banana', 5), ('laranja', 4), ('manga', 4), ('morango', 2)]

In [14]:
dados_arquivo.take(2)

['1,The Nightmare Before Christmas,1993,3.9,4568', '2,The Mummy,1932,3.5,4388']

In [30]:
import re

In [31]:
from operator import add

In [32]:
file_in = sc.textFile('C:\\Dados\\movies.txt')

In [33]:
print('number of lines in file: %s' % file_in.count())

number of lines in file: 10


In [34]:
chars = file_in.map(lambda s: len(s)).reduce(add)

In [35]:
print('number of characters in file: %s' % chars)

number of characters in file: 341


In [36]:
words =file_in.flatMap(lambda line: re.split('\W+', line.lower().strip()))

In [37]:
words.take(20)

['1',
 'the',
 'nightmare',
 'before',
 'christmas',
 '1993',
 '3',
 '9',
 '4568',
 '2',
 'the',
 'mummy',
 '1932',
 '3',
 '5',
 '4388',
 '3',
 'orphans',
 'of',
 'the']

In [38]:
words = words.filter(lambda x: len(x) > 3)

In [39]:
words.take(20)

['nightmare',
 'before',
 'christmas',
 '1993',
 '4568',
 'mummy',
 '1932',
 '4388',
 'orphans',
 'storm',
 '1993',
 '6150',
 'object',
 'beauty',
 '1991',
 '6150',
 'night',
 'tide',
 '1963',
 '5126']

In [40]:
words = words.map(lambda w: (w,1))

In [41]:
words.take(20)

[('nightmare', 1),
 ('before', 1),
 ('christmas', 1),
 ('1993', 1),
 ('4568', 1),
 ('mummy', 1),
 ('1932', 1),
 ('4388', 1),
 ('orphans', 1),
 ('storm', 1),
 ('1993', 1),
 ('6150', 1),
 ('object', 1),
 ('beauty', 1),
 ('1991', 1),
 ('6150', 1),
 ('night', 1),
 ('tide', 1),
 ('1963', 1),
 ('5126', 1)]

In [42]:
words = words.reduceByKey(add)

In [43]:
words.take(20)

[('nightmare', 1),
 ('before', 1),
 ('christmas', 2),
 ('1993', 2),
 ('4568', 1),
 ('4388', 1),
 ('storm', 1),
 ('beauty', 1),
 ('1991', 1),
 ('night', 1),
 ('tide', 1),
 ('wedding', 1),
 ('1994', 2),
 ('boys', 1),
 ('5733', 1),
 ('version', 1),
 ('1995', 1),
 ('mummy', 1),
 ('1932', 1),
 ('orphans', 1)]

In [28]:
words = words.map(lambda x: (x[1], x[0])).sortByKey(False)

In [29]:
words.take(20)

[(2, '1993'),
 (2, '1994'),
 (2, '5333'),
 (2, '6150'),
 (2, 'christmas'),
 (1, '1929'),
 (1, '1932'),
 (1, '1963'),
 (1, '1985'),
 (1, '1991'),
 (1, '1995'),
 (1, '4388'),
 (1, '4568'),
 (1, '5126'),
 (1, '5651'),
 (1, '5733'),
 (1, '6323'),
 (1, 'beauty'),
 (1, 'before'),
 (1, 'boys')]