In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://apache.osuosl.org/spark/spark-2.4.6/spark-2.4.6-bin-hadoop2.7.tgz
!tar xf spark-2.4.6-bin-hadoop2.7.tgz
!pip install -q findspark

In [None]:
ls

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.6-bin-hadoop2.7"

In [9]:
import time
import findspark
findspark.init()

from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext

conf = SparkConf().setAppName('APP_NAME')
conf = conf.setMaster("local[*]").set('spark.submit.deployMode', 'client')
sc = SparkContext(conf=conf)

spark = SparkSession(sc)

#Spark SQL

In [None]:
df = spark.read.option("header","true")\
               .option("inferschema","false")\
               .option("sep",",")\
               .csv("datasets_covid_jpn_total.csv")

In [None]:
from pyspark.sql.functions import *
df1 = df.groupBy(col("Location")).agg({"positive":"count"}).collect()
df1[:1]

In [None]:
df.orderBy(col("Date").asc()).show()

In [None]:
from pyspark.sql import Window

w = (Window.partitionBy('Date').orderBy('Date'))
df2 = df.select(row_number().over(w).alias("orden"), col("Date"), col("Location"))
df2.withColumn("indicador", when(col("orden") == 1, "True").when(col("orden") == 2, "False").otherwise("Null")).show()

In [None]:
def functionFirst(df):
  w = (Window.partitionBy('Date').orderBy('Date'))
  df2 = df.select(row_number().over(w).alias("orden"), col("Date"), col("Location"))
  df2.withColumn("indicador", when(col("orden") == 1, "True").when(col("orden") == 2, "False").otherwise("Null"))

  return df2

In [None]:
df3 = functionFirst(df)
df3.show()

In [None]:
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd

#Transformamos la data en listas que puedan ser tomadas dentro del plot
location = df.select(col("Location")).toPandas().values.flatten().tolist()
positive = df.select(col("Positive")).toPandas().values.flatten().tolist()

#identificamos la longitud de objetos que entraran en el plot
y_pos = np.arange(len(location))

#llenamos la funcion
plt.bar(y_pos, positive, align='center')
plt.xticks(y_pos, location)
plt.ylabel('Location')
plt.title('Curva de Contagio')

plt.show()

In [None]:
#Procesamiento paralelo por map & reduce
from operator import add

lines = spark.read.text("archivo1.txt").rdd.map(lambda r: r[0])
counts = lines.flatMap(lambda x: x.split(' ')) \
              .map(lambda x: (x, 1)) \
              .reduceByKey(add)

output = counts.collect()
for (word, count) in output:
    print("%s: %i" % (word, count))
    

In [None]:
df = spark.read.option("header","true")\
               .option("inferschema","true")\
               .option("sep",",")\
               .csv("Caso1 - Dataset enfermedad corazon.csv")

df.show()

In [None]:
!pip install scorecardpy

In [None]:

import scorecardpy as sc
import pandas as pd
type(df)
dfPandas = df.toPandas()
dt_s = sc.var_filter(dfPandas, y="Flag_hipertension")
bins = sc.woebin(dt_s, y="Flag_hipertension")
bins

In [None]:
sc.woebin_plot(bins)

#Spark Streaming

In [None]:
from pyspark.streaming import StreamingContext
from pyspark.sql import Row

ssc = StreamingContext(sc, 1)

rddQueue = []
for i in range(5):
  rddQueue += [ssc.sparkContext.parallelize([j for j in range(1, 1001)], 10)]

inputStream = ssc.queueStream(rddQueue)
mappedStream = inputStream.map(lambda x: (x % 10, 1))
reducedStream = mappedStream.reduceByKey(lambda a, b: a + b)
reducedStream.pprint()

ssc.start()
time.sleep(6)
ssc.stop(stopSparkContext=True, stopGraceFully=True)

#MLib

In [10]:
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.stat import Statistics
from pyspark.mllib.util import MLUtils

In [11]:
    filepath = 'sample_linear_regression_data.txt'
    corrType = 'pearson'
    
    points = MLUtils.loadLibSVMFile(sc, filepath)\
        .map(lambda lp: LabeledPoint(lp.label, lp.features.toArray()))

    print()
    print('Summary of data file: ' + filepath)
    print('%d data points' % points.count())

    # Statistics (correlations)
    print()
    print('Correlation (%s) between label and each feature' % corrType)
    print('Feature\tCorrelation')
    numFeatures = points.take(1)[0].features.size
    labelRDD = points.map(lambda lp: lp.label)
    for i in range(numFeatures):
        featureRDD = points.map(lambda lp: lp.features[i])
        corr = Statistics.corr(labelRDD, featureRDD, corrType)
        print('%d\t%g' % (i, corr))
    print()


Summary of data file: sample_linear_regression_data.txt
501 data points

Correlation (pearson) between label and each feature
Feature	Correlation
0	0.00595645
1	0.0332056
2	-0.0406646
3	0.123178
4	0.0240118
5	0.0648617
6	-0.0223995
7	-0.0279813
8	-0.0359889
9	0.0345207

