In [None]:
!pip install pyspark

import pyspark
from pyspark.sql import SparkSession

!wget https://raw.githubusercontent.com/neelamdoshi/Spark_neelam/main/diabetes.csv


#read csv
spark = SparkSession.builder.appName('deeplearn').getOrCreate()
df = spark.read.csv('diabetes.csv', header=True, inferSchema=True)


df.show()
df.printSchema()

# get the dimensions of the data
(df.count() , len(df.columns))

from pyspark.sql.functions import col, when
numeric_cols = [col_name for col_name, dtype in my_data1.dtypes if dtype in ['int', 'double', 'float']]
# Remove Outcome
numeric_cols.remove('Outcome')

for col_name in numeric_cols:
    median = my_data1.fillna(0).approxQuantile(col_name, [0.5], 0.01)[0]
    print(f"Median of {col_name}: {median}")
    my_data1 = my_data1.withColumn(col_name, when(col(col_name) == 0, median).otherwise(col(col_name)))


my_data1.show(5)


my_data1.show()

from pyspark.ml.feature import VectorAssembler

# specify the input and output columns of the vector assembler
assembler = VectorAssembler(inputCols=['Pregnancies',
                                       'Glucose',
                                       'BloodPressure',
                                       'SkinThickness',
                                       'Insulin',
                                       'BMI',
                                       'DiabetesPedigreeFunction',
                                       'Age'],
                           outputCol='features')


# transform the data
final_data = assembler.transform(my_data1)

# view the transformed vector

final_data.select("features","Outcome").show(5)

from pyspark.ml.classification import LogisticRegression
xtrain, xtest = final_data.randomSplit([0.7, 0.3])


lr = LogisticRegression(featuresCol = 'features', labelCol = 'Outcome', maxIter=10000)

lrModel = lr.fit(xtrain)

predictions = lrModel.transform(xtest)

predictions.show(5)

from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator()
evaluator.setLabelCol("Outcome")
evaluator.evaluate(predictions)

In [None]:
#Pipeline
!pip install pyspark

import pyspark
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.sql.functions import col, when

!wget https://raw.githubusercontent.com/neelamdoshi/Spark_neelam/main/diabetes.csv


#read csv
spark = SparkSession.builder.appName('deeplearn').getOrCreate()
df = spark.read.csv('diabetes.csv', header=True, inferSchema=True)
df.show()
df.printSchema()

def stage_1(x):
    # Define numeric columns
    numeric_cols = [col_name for col_name, dtype in x.dtypes
                    if dtype in ['int', 'double', 'float']]

    # Remove 'Outcome' column from numeric_cols if it exists
    if 'Outcome' in numeric_cols:
        numeric_cols.remove('Outcome')
    print("Numeric Columns:", numeric_cols)

    # Impute zeros with median for numeric columns
    for col_name in numeric_cols:
        median = x.fillna(0).approxQuantile(col_name, [0.5], 0.01)[0]
        x = x.withColumn(col_name, when(col(col_name) == 0, median).otherwise(col(col_name)))

    return x, numeric_cols

stage1, featuresCol = stage_1(df)
stage1.show()

# Vector Assembler stage
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LogisticRegression
stage2 = VectorAssembler(inputCols=featuresCol,outputCol='features')

# Logistic Regression stage
stage3 = LogisticRegression(featuresCol='features', labelCol='Outcome', maxIter=100)


#create pipeline
regression_pipeline = Pipeline(stages=[stage2, stage3])


# fit the pipeline for the trainind data
model = regression_pipeline.fit(stage1)
# transform the data
model_train = model.transform(stage1)


model_train.select('features', 'Outcome', 'prediction').show()

from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator()
evaluator.setLabelCol("Outcome")
evaluator.setPredictionCol("prediction")
accuracy = evaluator.evaluate(model_train)
print(accuracy)

#diff
!pip install pyspark

# Import necessary modules
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType


from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.feature import Imputer, VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
import pyspark.sql.types as tp

# Create a Spark session
spark = SparkSession.builder \
    .appName("DiabetesPredictionPipeline") \
    .getOrCreate()

# Read the CSV file
my_data = spark.read.csv('/content/diabetes.csv', header=True)

# Define the schema for the data
my_schema = tp.StructType([
    tp.StructField(name='Pregnancies', dataType=tp.IntegerType(), nullable=True),
    tp.StructField(name='Glucose', dataType=tp.IntegerType(), nullable=True),
    tp.StructField(name='BloodPressure', dataType=tp.IntegerType(), nullable=True),
    tp.StructField(name='SkinThickness', dataType=tp.IntegerType(), nullable=True),
    tp.StructField(name='Insulin', dataType=tp.IntegerType(), nullable=True),
    tp.StructField(name='BMI', dataType=tp.DoubleType(), nullable=True),
    tp.StructField(name='DiabetesPedigreeFunction', dataType=tp.DoubleType(), nullable=True),
    tp.StructField(name='Age', dataType=tp.IntegerType(), nullable=True),
    tp.StructField(name='Outcome', dataType=tp.IntegerType(), nullable=True)
])

# Read the data again with the defined schema
my_data = spark.read.csv('diabetes.csv', schema=my_schema, header=True)

# Print the schema
my_data.printSchema()


# Read the CSV file
my_data = spark.read.csv('diabetes.csv', schema=my_schema, header=True)

# Convert zeros to nulls
from pyspark.sql.functions import when

# List of columns where you want to replace zeros with null
cols_to_check = ['Glucose', 'BloodPressure', 'SkinThickness', 'Insulin', 'BMI']

# Replace zeros with nulls
for col in cols_to_check:
    my_data = my_data.withColumn(col, when(my_data[col] == 0, None).otherwise(my_data[col]))

# Now you can proceed with the rest of your pipeline


# Define stages for the pipeline
imputer = Imputer(
    inputCols=['Glucose', 'BloodPressure', 'SkinThickness', 'Insulin', 'BMI'], # Specify columns to impute
    outputCols=["{}_imputed".format(c) for c in ['Glucose', 'BloodPressure', 'SkinThickness', 'Insulin', 'BMI']]
).setStrategy("median")

assembler = VectorAssembler(
    inputCols=['Pregnancies', 'Glucose_imputed', 'BloodPressure_imputed', 'SkinThickness_imputed',
               'Insulin_imputed', 'BMI_imputed', 'DiabetesPedigreeFunction', 'Age'], # Use imputed columns
    outputCol='features'
)

lr = LogisticRegression(featuresCol='features', labelCol='Outcome', maxIter=10)

# Create the pipeline
pipeline = Pipeline(stages=[imputer, assembler, lr])

# Split the data into training and test sets
xtrain, xtest = my_data.randomSplit([0.8, 0.2])

# Fit the pipeline on training data
pipeline_model = pipeline.fit(xtrain)

# Make predictions on the test data
predictions = pipeline_model.transform(xtest)

# Evaluate the model
evaluator = MulticlassClassificationEvaluator(labelCol="Outcome", predictionCol="prediction")
accuracy = evaluator.evaluate(predictions, {evaluator.metricName: "accuracy"})

print(f"Accuracy: {accuracy}")

# Stop the Spark session
spark.stop()

In [None]:
#CRUD

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when

# Initialize a SparkSession
spark = SparkSession.builder.appName("CRUD Operations").getOrCreate()

# Example data to simulate a table
data = [
    (1, "Anil", 21),
    (2, "Asad", 23),
    (3, "Deep", 22),
    (4, "Danish", 25)
]

# Create schema
columns = ["id", "name", "age"]

# Create a DataFrame (This simulates the 'Create' operation for the table)
df = spark.createDataFrame(data, columns)

# 1. CREATE - Show the initial table
print("Initial Table:")
df.show()

# 2. READ - Read from the table (Select specific columns)
print("Read Operation (Select 'name' and 'age'):")
df.select("name", "age").show()

# 3. UPDATE - Let's update the 'age' of a person with 'id' = 2 (Bob)
print("Update Aamir age to 32:")
df_update = df.withColumn("age", when(col("id") == 2, 28).otherwise(col("age")))
df_update.show()

# 4. DELETE - Let's delete the row where the name is 'Danny'
print("Delete operation (Remove Danish):")
df_delete = df_update.filter(col("name") != "Danish")
df_delete.show()

# Stopping the Spark session
spark.stop()

import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[1]") \
                    .appName('SparkByExamples.com') \
                    .getOrCreate()

data = [("James","Smith","IND","MH"),("Michael","Rose","IND","MP"), \
    ("Robert","Williams","IND","UP"),("Maria","Jones","IND","TN") \
  ]
columns=["firstname","lastname","country","state"]
df=spark.createDataFrame(data=data,schema=columns)
df.show()
print(df.collect())


states1=df.rdd.map(lambda x: x[3]).collect()
print(states1)
from collections import OrderedDict
res = list(OrderedDict.fromkeys(states1))
print(res)



#Example 2
states2=df.rdd.map(lambda x: x.state).collect()
print(states2)

states3=df.select(df.state).collect()
print(states3)

states4=df.select(df.state).rdd.flatMap(lambda x: x).collect()
print(states4)

states5=df.select(df.state).toPandas()['state']
states6=list(states5)
print(states6)

pandDF=df.select(df.state,df.firstname).toPandas()
print(list(pandDF['state']))
print(list(pandDF['firstname']))
