# PySpark

Spark es un "framework" gratuito y de código abierto para la computación en **clusters** de ordenadores (agrupación o conglomerado de ordenadores con un mismo "hardware" común que se comportan como si fuesen un único ordenador), que se usan mayoritariamente para procesar cálculos complejos o grandes volúmenes de datos. Una de las principales ventajas de Spark es que combina esta capacidad de computación distribuida con un código sencillo y elegante, que además ofrece soporte a múltiples lenguajes de programación, entre ellos el Python.

## Funciones lambda

In [None]:
from pyspark import SparkConf, SparkContext
conf = SparkConf().setMaster("local").setAppName("Mi programa")
sc = SparkContext(conf = conf)
lines = sc.textFile("ejemplo.txt")

In [None]:
sc

## Dataframes

In [None]:
from pyspark import SparkConf, SparkContext
conf = SparkConf().setMaster("local").setAppName("Mi programa")
sc = SparkContext(conf = conf)

In [None]:
import pandas as pd
df = pd.read_cvs("C:/Users/Roger Castillo/Desktop/GitHub/Python/Ciencia_Datos/base_datos_2008.csv", rows=1000000)

In [None]:
df.head(2)

In [None]:
from pyspark.sql.types import StringType
from pyspark import SQLContext
sqlContext = SQLContext(sc)

dfspark = sqlContext.read.format("csv").option("header","true").option("inferSchema","true").load("C:/Users/Roger Castillo/Desktop/GitHub/Python/Ciencia_Datos/base_datos_2008.csv")

In [None]:
dfspark.show(2)

In [None]:
dfspark.head(2)

In [None]:
dfspark.count()

In [None]:
dfspark = dfspark.sample(fraction=0.001,withReplacement = False)

In [None]:
dfspark.count()

In [None]:
dfspark = dfspark.withColumn("ArrDelay",dfspark["ArrDelay"].cast(integer))

In [None]:
df2 = dfspark.na,drop(sebset = ["ArrDelay","DepDelay","Distance"])

In [None]:
df2 = df2.filter("ArrDelay is not NULL")

In [None]:
df2.count()

In [None]:
df2.printSchema()

In [None]:
import numpy as np
media = np.mean(df2.select("ArrDelay").collect())

In [None]:
media

In [None]:
df2.rdd.getNumPartitions()

## Transformaciones básicas

In [None]:
from pyspark import SparkConf, SparkContext
from pyspark.sql.types import StringType
from pyspark import SQLContext
import numpy as np

conf = SparkConf().setMaster("local").setAppName("Mi programa")
sc = SparkContext(conf = conf)
sqlContext = SQLContext(sc)

dfspark = sqlContext.read.format("csv").option("header","true").option("inferSchema","true").load("C:/Users/Roger Castillo/Desktop/GitHub/Python/Ciencia_Datos/base_datos_2008.csv")
dfspark = dfspark.sample(fraction=0.001,withReplacement=False)
dfspark = dfspark.withColumn("ArrDelay",dfspark["ArrDelay"].cast("integer"))

df2 = dfspark.na.drop(subset=['ArrDelay', 'DepDelay','Distance'])
df2 = df2filter("ArrDelay is not NULL")
df2 = df2.dropDuplicates()

In [None]:
df2.select("ArrDelay").filter("ArrDelay > 60").takes(5) #[0]

In [None]:
df2.filter("ArrDelay > 60").take(5)

In [None]:
media = np.mean(df2.select('ArrDelay').collect())
df2.sellect("ArrDelay").rdd.map(lambda x: (x - media)**2).take(10)

In [None]:
df2.groupBy("DayOfWeek").count().show()

In [None]:
df2.groupBy("DayOfWeek").mean("ArrDelay").show()

In [None]:
df2.select("Origin").rdd.distinct().take(5)

In [None]:
df2.select("Origin").rrd.distinct.count()

In [None]:
df2.orderBy(df2.ArrDelay.desc()).take(5)

## Funciones básicas

In [None]:
from pyspark import SparkConf, SparkContext
from pyspark.sql.types import StringType
from pyspark import SQLContext
import numpy as np

conf = SparkConf().setMaster("local").setAppName("Mi programa")
sc = SparkContext(conf = conf)
sqlContext = SQLContext(sc)

dfspark = sqlContext.read.format("csv").option("header","true").option("inferSchema","true").load("C:/Users/Roger Castillo/Desktop/GitHub/Python/Ciencia_Datos/base_datos_2008.csv")
dfspark = dfspark.sample(fraction=0.001,withReplacement=False)
dfspark = dfspark.withColumn("ArrDelay",dfspark["ArrDelay"].cast("integer"))

df2 = dfspark.na.drop(subset=['ArrDelay', 'DepDelay','Distance'])
df2 = df2filter("ArrDelay is not NULL")
df2 = df2.dropDuplicates()

In [None]:
df2.select("ArrDelay").describe().show()

In [None]:
df2.select("Origin").rdd.countByValue()

In [None]:
df2.select("ArrDelay").rdd.collect()

In [None]:
df2.select("Origin").rdd.collect()

In [None]:
df2.crosstab("DayOfWeek","Origin").take(2)

## Operaciones númericas con RDD

In [None]:
from pyspark import SparkConf, SparkContext
from pyspark.sql.types import StringType
from pyspark import SQLContext
import numpy as np

conf = SparkConf().setMaster("local").setAppName("Mi programa")
sc = SparkContext(conf = conf)
sqlContext = SQLContext(sc)

dfspark = sqlContext.read.format("csv").option("header","true").option("inferSchema","true").load("C:/Users/Roger Castillo/Desktop/GitHub/Python/Ciencia_Datos/base_datos_2008.csv")
dfspark = dfspark.sample(fraction=0.001,withReplacement=False)
dfspark = dfspark.withColumn("ArrDelay",dfspark["ArrDelay"].cast("integer"))

df2 = dfspark.na.drop(subset=['ArrDelay', 'DepDelay','Distance'])
df2 = df2filter("ArrDelay is not NULL")
df2 = df2.dropDuplicates()

In [None]:
lista = sc.parallelize(range(1,1000000))
lista.reduce(lambda x,y:x+y)

In [None]:
lista.sum()

In [None]:
from pyspark.sql.funtions import mean, stddev, col
media = df2.select(mean(col('ArrDelay'))).collect()
std = df2.select(stddev(col('ArrDelay'))).collect()

In [None]:
std

In [None]:
std[0][0]

In [None]:
df2.withColumn('Distance', df2['ArrDelay'] - df2['DepDelay']).collect)

In [None]:
df2.withColumn('Standar', (df2['ArrDelay'] - media[0][0])/std[0][0]).collect()

## Spark avanzado

In [None]:
from pyspark import SparkConf, SparContext
conf = SparkConf().setMaster("local").setAppName("Mi programa")
sc = SparkContext(conf = conf)

In [None]:
x = sc.parallelizr([("a", 5), ("b", 6), ("c", 7), ("d", 8)])
y = sc.parallelizr([("a", 1), ("a", 2), ("c", 3)])

In [None]:
x.join(y).collect()

In [None]:
x.join(x).collect()

In [None]:
y.leftOuterJoin(x).collect()

In [None]:
y.leftOuterJoin(y).collect()

In [None]:
y.rightOuterJoin(x).collect()

## Acumuladores

Detectando patrones en los datos

In [None]:
from pyspark import SparkConf, SparContext
conf = SparkConf().setMaster("local").setAppName("Mi programa")
sc = SparkContext(conf = conf)
lines = sc.textFile("ejemplo.txt")

In [None]:
py = sc.accumulator(0)
sp = sc.accumulator(0)

In [None]:
def lenguajes(linea):
    global py, sp
    if "Python" in linea:
        py += 1
        if "Spark" in linea:
            sp += 1
        return True
    if "Spark" in linea:
        sp += 1
        return True
    else:
        return False

valores = lines.filter(lenguajes)

In [None]:
valores.collect

In [None]:
py

In [None]:
sp

## Cómo construir funciones map

In [None]:
from pyspark import SparkConf, SparkContext
from pyspark.sql.types import StringType
from pyspark import SQLContext

conf = SparkConf().setMaster("local").setAppName("Mi programa")
sc = SparkContext(conf = conf)
sqlContext = SQLContext(sc)

dfspark = sqlContext.read.format("csv").option("header","true").option("inferSchema","true").load("C:/Users/Roger Castillo/Desktop/GitHub/Python/Ciencia_Datos/base_datos_2008.csv")
dfspark = dfspark.sample(fraction=0.001,withReplacement=False)
dfspark = dfspark.withColumn("ArrDelay",dfspark["ArrDelay"].cast("integer"))

df2 = dfspark.na.drop(subset=['ArrDelay', 'DepDelay','Distance'])
df2 = df2filter("ArrDelay is not NULL")
df2 = df2.dropDuplicates()

In [None]:
A = sc.parallelize(df2.select("Origin").rdd.collect())

In [None]:
A.perist()

In [None]:
mapfuntion = A.map(lambda x: (x,1))

In [None]:
mapfuntion.collect()

In [None]:
def fun(x):
    if x[0] in ["SEA","ATL","HOU"]:
        return((x,2))
    elif x[0] == "DEN":
        return((x,3))
    else:
        return((x,1))


In [None]:
mapfuntion2 = A.map(fun)

In [None]:
mapfuntion2.collect()

## Cómo construir funciones reduce

In [None]:
from pyspark import SparkConf, SparkContext
from pyspark.sql.types import StringType
from pyspark import SQLContext

conf = SparkConf().setMaster("local").setAppName("Mi programa")
sc = SparkContext(conf = conf)
sqlContext = SQLContext(sc)

dfspark = sqlContext.read.format("csv").option("header","true").option("inferSchema","true").load("C:/Users/Roger Castillo/Desktop/GitHub/Python/Ciencia_Datos/base_datos_2008.csv")
dfspark = dfspark.sample(fraction=0.001,withReplacement=False)
dfspark = dfspark.withColumn("ArrDelay",dfspark["ArrDelay"].cast("integer"))

df2 = dfspark.na.drop(subset=['ArrDelay', 'DepDelay','Distance'])
df2 = df2filter("ArrDelay is not NULL")
df2 = df2.dropDuplicates()

In [None]:
A = sc.parallelize(df2.select("Origin").rdd.collect())

In [None]:
A.perist()

In [None]:
mapfuntion = A.map(lambda x: (x,1))

In [None]:
mapfuntion2 = A.map(fun)

In [None]:
reducefuntion = mapfuntion,reduceByKey(lambda x,y: x+y)

In [None]:
reducefuntion.collect() # Se usa take(10) para hacerla más eficiente

In [None]:
reducefuntion2 = mapfuntion2,reduceByKey(lambda x,y: x+y)

In [None]:
reducefuntion2.take(10)

In [None]:
reducefuntion.sortByKey().take(10)

In [None]:
reducefuntion.sortBy(lambda x: x[1], ascending=False).take(10)

In [None]:
reducefuntion2.sortBy(lambda x: x[1], ascending=False).take(10)

## Ejemplos básicos de MapReduce

In [None]:
from pyspark import SparkConf, SparkContext
from pyspark.sql.types import StringType
from pyspark import SQLContext

conf = SparkConf().setMaster("local").setAppName("Mi programa")
sc = SparkContext(conf = conf)
sqlContext = SQLContext(sc)

In [None]:
lines = sc.textFile("ejemplo.txt")
#lines = lines.repartition(3)
lines.getNumPartitions()

In [None]:
py = sc.accumulator(0)
sp = sc.accumulator(0)

def lenguajes(linia):
    global py, sp
    if "Python" in linia:
        py += 1
        if "Spark" in linia:
            sp += 1
        return True
    elif "Spark" in linia:
        sp += 1
        return True
    else:
        return False

valores = lines.filter(lenguajes)

In [None]:
valores.count()

In [None]:
py

In [None]:
sp

In [None]:
funcionmap = valores.map(lambda x: (x,1))

In [None]:
contarcalores = valores.map(lambda x,y: x+y)

In [None]:
contarvalores.count()

In [None]:
contarvalores.sortBy(lambda x: x[1],ascending=False).take(5)

In [None]:
def lenguajes(linia):
    global py, sp
    if "Python" in x and "Spark" in x:
        return ("count", (1,1))
    elif "Python" in x:
        return ("count", (1,0))
    elif "Spark" in x:
        return ("count", (0,1))
    else:
        return ("count", (0,0))

mapfun = lines.map(lenguajes_map)

In [None]:
mapfun.count()

In [None]:
lines.count()

In [None]:
mapfun.reduceByKey(lambda x,y: (x[0]+y[0], x[1]+y[1])).collect()

In [7]:
import findspark
findspark.init()

Exception: Unable to find py4j, your SPARK_HOME may not be configured correctly

In [5]:
import pyspark

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

df = spark.sql("select 'spark' as hello ")

df.show()

FileNotFoundError: [WinError 2] El sistema no puede encontrar el archivo especificado