In [1]:
import pyspark
import pandas as pd

In [2]:
from pyspark.sql import SparkSession

In [9]:
from pyspark.sql import *
from pyspark.mllib.stat import Statistics

In [10]:
spark = SparkSession.builder.appName('Diabetes').getOrCreate()

In [11]:
df= spark.read.csv('diabetes.csv',header=True,inferSchema=True)

In [12]:
df.printSchema()

root
 |-- Pregnancies: integer (nullable = true)
 |-- Glucose: integer (nullable = true)
 |-- BloodPressure: integer (nullable = true)
 |-- SkinThickness: integer (nullable = true)
 |-- Insulin: integer (nullable = true)
 |-- BMI: double (nullable = true)
 |-- DiabetesPedigreeFunction: double (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Outcome: integer (nullable = true)



In [14]:
#print(Statistics.corr(df, method="pearson"))

In [15]:
#summary = Statistics.colStats(df)
#print(summary.mean())  # a dense vector containing the mean value for each column
#print(summary.variance())  # column-wise variance
#print(summary.numNonzeros())  # number of nonzeros in each column

In [17]:
total=df.count()

In [None]:
df.filter(df.Outcome==1).show(5)

In [None]:
df.describe().toPandas().T

In [None]:
df.groupBy('Outcome').count().show()

In [None]:
df=df.sample(.7500).collect()
len(df)

In [None]:
 
df = spark.createDataFrame(data=df)
#rdd = spark.sparkContext.parallelize(df5)
df.show()

In [None]:
df5.toPandas()

In [None]:
import matplotlib.pyplot as plt
import seaborn as sns

In [None]:
df.groupBy(df.Outcome).count().show()


In [None]:
for col in df.columns[0:10]:
    print(col)
    df.filter(df.Outcome==1).toPandas()[col].hist(alpha=.7);
    df.filter(df.Outcome==0).toPandas()[col].hist(alpha=.7);
    plt.show()
    plt.close()

In [None]:
df2= df.filter((df.Glucose>145))
print(df2.count())

In [None]:
for col in df2.columns[0:10]:
    print(col)
    df2.filter(df2.Outcome==1).toPandas()[col].hist(alpha=.7);
    df2.filter(df2.Outcome==0).toPandas()[col].hist(alpha=.7);
    plt.show()
    plt.close()

In [None]:
df3= df.filter(~(df.Glucose>145)&(df.Age<28))
print(df3.count())

In [None]:
for col in df3.columns[0:10]:
    print(col)
    df3.filter(df3.Outcome==1).toPandas()[col].hist(alpha=.7);
    df3.filter(df3.Outcome==0).toPandas()[col].hist(alpha=.7);
    plt.show()
    plt.close()

In [None]:
df4= df.filter(~(~(df.Glucose>145)&(df.Age<28)))
print(df4.count())

In [None]:
for col in df4.columns[0:10]:
    print(col)
    df4.filter(df4.Outcome==1).toPandas()[col].hist(alpha=.7);
    df4.filter(df4.Outcome==0).toPandas()[col].hist(alpha=.7);
    plt.show()
    plt.close()

In [None]:
from pyspark.sql.functions import *


In [None]:
df.select([count(when(isnan(c),c)).alias(c) for c in df.columns]).toPandas()

# UDF

In [None]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

In [None]:
y_udf = udf(lambda y : 'no' if y == 0 else 'yes', StringType())

In [None]:
df = df.withColumn('hasDiadetes',y_udf('outcome')).drop('Outcome')

In [None]:
df.columns

In [None]:
def udf_multi(age):
    if age<= 25:
        return 'under 25'
    elif ( age > 25) & (age <=35 ):
        return '25-35'
    elif (age > 35) & (age <=50) :
        return '36-50'
    elif (age>50) :
        return '> 50'
    else:
        return 'NaN'


In [None]:
age_udf = udf(udf_multi)
df = df.withColumn('age_udf',age_udf('Age'))

In [None]:
df.show(5)

In [None]:
from pyspark.sql import functions as F
from pyspark.sql import window

In [None]:
window= Window.rowsBetween(Window.unboundedPreceding,Window.unboundedFollowing)

In [None]:
window

In [None]:
age_group_tab = df.select(['age_udf','Glucose']).\
                groupBy('age_udf').\
                    agg(
                        F.count('Glucose').alias('UserCount'),
                        F.mean('Glucose').alias('Glucose_AVG'),
                        F.min('Glucose').alias('Glucose_MIN'),
                        F.max('Glucose').alias('Glucose_MAX'),)\
                        .withColumn('total',sum(col('UserCount')).over(window)).\
                        withColumn('Percent',col('UserCount')*100/ col('total')).\
                        drop(col('total')).sort(desc('Percent'))                                

In [None]:
age_group_tab.show()

In [None]:
age_group_tab.toPandas().plot()