## Instalação do PySpark

# Instalando o Java 8

In [1]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

# Baixando spark

In [2]:
!wget -q https://archive.apache.org/dist/spark/spark-3.1.2/spark-3.1.2-bin-hadoop2.7.tgz

# Descompatacando o spark

In [3]:
!tar xf spark-3.1.2-bin-hadoop2.7.tgz

# Instalando o findspark

In [4]:
!pip install -q findspark

# Instalando o pyspark


In [5]:
!pip install -q pyspark

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m4.5 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone


# Definindo as variáveis de ambiente

In [6]:
import os
os.environ['JAVA_HOME'] = '/usr/lib/jvm/java-8-openjdk-amd64'
os.environ['SPARK_HOME'] = '/content/spark-3.1.2-bin-hadoop2.7'

# Iniciando o spark

In [7]:
import findspark
findspark.init()

# Data Frame

In [8]:
#CRIANDO UMA SESSÃO

from pyspark.sql import SparkSession

spark = SparkSession.builder\
.master('local[*]')\
.appName('Dataframes com spark')\
.getOrCreate()

#CRIANDO DATA FRAME

dados = [('Carla', '23'),
         ('Zeca', '33'),
         ('Pedro', '52'),
         ('Gabi', '19'), 
         ('Mariana', '22'), 
         ('João,', '34'), 
         ('Kelly', '43')
         ]

colunas = ['Nome', 'Idade']

df1 = spark.createDataFrame(dados, colunas)
df1.show()

df1.limit(20).toPandas()

+-------+-----+
|   Nome|Idade|
+-------+-----+
|  Carla|   23|
|   Zeca|   33|
|  Pedro|   52|
|   Gabi|   19|
|Mariana|   22|
|  João,|   34|
|  Kelly|   43|
+-------+-----+



Unnamed: 0,Nome,Idade
0,Carla,23
1,Zeca,33
2,Pedro,52
3,Gabi,19
4,Mariana,22
5,"João,",34
6,Kelly,43


In [9]:
#CRIANDO UMA SESSÃO

from pyspark.sql import SparkSession
from pyspark.sql import Row

spark = SparkSession.builder\
.master('local[*]')\
.appName('Dataframes com spark')\
.getOrCreate()

#CRIANDO DATA FRAME

df2 = spark.createDataFrame(
    [Row(descrição='Monitor', preço=800, fabricante='Samsung')]

)

df2.show()
df2.limit(10).toPandas()

df3 = spark.createDataFrame(
    [Row(descrição='Raygon', preço=800, fabricante='AMD')]

)

df_final = df2.unionAll(df3)

df_final.show()

df_final.printSchema()

+---------+-----+----------+
|descrição|preço|fabricante|
+---------+-----+----------+
|  Monitor|  800|   Samsung|
+---------+-----+----------+

+---------+-----+----------+
|descrição|preço|fabricante|
+---------+-----+----------+
|  Monitor|  800|   Samsung|
|   Raygon|  800|       AMD|
+---------+-----+----------+

root
 |-- descrição: string (nullable = true)
 |-- preço: long (nullable = true)
 |-- fabricante: string (nullable = true)



In [10]:
df_classe = spark.createDataFrame(
    [Row(matricula='0001', nome='Nando', AV1=10, AV2=10, AV3=10)]
)
df_classe2 = spark.createDataFrame(
    [Row(matricula='0002', nome='Felipe', AV1=10, AV2=10, AV3=10)]
)
df_classe3 = spark.createDataFrame(
    [Row(matricula='0003', nome='Fer', AV1=10, AV2=10, AV3=10)]
)
df_classe4 = spark.createDataFrame(
    [Row(matricula='0004', nome='Fernando', AV1=10, AV2=10, AV3=10)]
)
df_classe5 = spark.createDataFrame(
    [Row(matricula='0005', nome='Luiz', AV1=10, AV2=10, AV3=10)]
)

df_final = df_classe.unionAll(df_classe2).unionAll(df_classe3).unionAll(df_classe4).unionAll(df_classe5)
df_final.show()

+---------+--------+---+---+---+
|matricula|    nome|AV1|AV2|AV3|
+---------+--------+---+---+---+
|     0001|   Nando| 10| 10| 10|
|     0002|  Felipe| 10| 10| 10|
|     0003|     Fer| 10| 10| 10|
|     0004|Fernando| 10| 10| 10|
|     0005|    Luiz| 10| 10| 10|
+---------+--------+---+---+---+



In [11]:
df_clubes = [
    {'Nome':'Nando', 'Fundação':1969},
    {'Nome':'Guilherme', 'Fundação':1973},
    {'Nome':'Ferreira', 'Fundação':2003},
    {'Nome':'Heitor', 'Fundação':2000},

]

df_clubes2 = spark.createDataFrame(df_clubes)

df_clubes2.limit(5).toPandas()

Unnamed: 0,Fundação,Nome
0,1969,Nando
1,1973,Guilherme
2,2003,Ferreira
3,2000,Heitor


In [12]:
path = "/content/sample_data/california_housing_test.csv"

house_df = spark.read.csv(path, sep=',', inferSchema=True, header=True)

house_df = house_df.toDF('longitude','latitude','idade_mediana','total_comodos','total_quartos','população', 'familias','renda_mediana','valorCasa_mediana')

house_df.count()
house_df.select('*').where('total_quartos > 5000').show()
#house_df.select('idade_mediana','total_quartos','valorCasa_mediana').show()
#house_df.limit(100).toPandas()
#house_df.show()

+---------+--------+-------------+-------------+-------------+---------+--------+-------------+-----------------+
|longitude|latitude|idade_mediana|total_comodos|total_quartos|população|familias|renda_mediana|valorCasa_mediana|
+---------+--------+-------------+-------------+-------------+---------+--------+-------------+-----------------+
|  -121.53|   38.48|          5.0|      27870.0|       5027.0|  11935.0|  4855.0|       4.8811|         212200.0|
|  -118.44|   33.98|         21.0|      18132.0|       5419.0|   7431.0|  4930.0|       5.3359|         500001.0|
|   -117.2|   33.58|          2.0|      30450.0|       5033.0|   9419.0|  3197.0|       4.5936|         174300.0|
+---------+--------+-------------+-------------+-------------+---------+--------+-------------+-----------------+



In [13]:
house_df.select('*').where('total_quartos > 5000').show()


+---------+--------+-------------+-------------+-------------+---------+--------+-------------+-----------------+
|longitude|latitude|idade_mediana|total_comodos|total_quartos|população|familias|renda_mediana|valorCasa_mediana|
+---------+--------+-------------+-------------+-------------+---------+--------+-------------+-----------------+
|  -121.53|   38.48|          5.0|      27870.0|       5027.0|  11935.0|  4855.0|       4.8811|         212200.0|
|  -118.44|   33.98|         21.0|      18132.0|       5419.0|   7431.0|  4930.0|       5.3359|         500001.0|
|   -117.2|   33.58|          2.0|      30450.0|       5033.0|   9419.0|  3197.0|       4.5936|         174300.0|
+---------+--------+-------------+-------------+-------------+---------+--------+-------------+-----------------+



# Criando uma sessão no spark

In [14]:
#spark = SparkSession.builder\
# .master('local[*]')\
#  .appName('Meu App Spark')\
#  .getOrCreate()

In [15]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.master('local[*]').appName('Meu App Spark').getOrCreate()

# Criando um dataset

In [16]:
dataset = spark.read.csv('/content/sample_data/california_housing_test.csv', inferSchema=True, header=True)

# Printando o schema

In [17]:
dataset.printSchema()

root
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- housing_median_age: double (nullable = true)
 |-- total_rooms: double (nullable = true)
 |-- total_bedrooms: double (nullable = true)
 |-- population: double (nullable = true)
 |-- households: double (nullable = true)
 |-- median_income: double (nullable = true)
 |-- median_house_value: double (nullable = true)



# Encerrar a sessão

In [18]:
spark.stop()