In [30]:
from pyspark.sql import SparkSession

In [31]:
import pandas as pd
import numpy as np

In [32]:
spark = SparkSession.builder.appName("DataFrame").getOrCreate()

In [33]:
data=spark.read.format('csv').\
option ('inferSchema','true').\
option ('header','true').\
option ('path','data.csv').\
load()
data.printSchema()

root
 |-- Name: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Experience: integer (nullable = true)



# Data Processing

In [34]:
data.columns

['Name', 'Age', 'Experience']

In [35]:
data.select('Name').show()

+----+
|Name|
+----+
|   A|
|   B|
|   C|
+----+



In [36]:
data.select(['name', 'Experience', ]).show()

+----+----------+
|name|Experience|
+----+----------+
|   A|        10|
|   B|         8|
|   C|         4|
+----+----------+



In [37]:
data.describe()

DataFrame[summary: string, Name: string, Age: string, Experience: string]

In [38]:
data.describe().show()

+-------+----+----+-----------------+
|summary|Name| Age|       Experience|
+-------+----+----+-----------------+
|  count|   3|   3|                3|
|   mean|null|30.0|7.333333333333333|
| stddev|null| 1.0|3.055050463303893|
|    min|   A|  29|                4|
|    max|   C|  31|               10|
+-------+----+----+-----------------+



In [39]:
data.withColumn('after 2 year',data['Experience']+2).show()

+----+---+----------+------------+
|Name|Age|Experience|after 2 year|
+----+---+----------+------------+
|   A| 31|        10|          12|
|   B| 30|         8|          10|
|   C| 29|         4|           6|
+----+---+----------+------------+



In [40]:
data.withColumn('Gender',lit('M')).show()

+----+---+----------+------+
|Name|Age|Experience|Gender|
+----+---+----------+------+
|   A| 31|        10|     M|
|   B| 30|         8|     M|
|   C| 29|         4|     M|
+----+---+----------+------+



In [59]:
from pyspark.sql.functions import lit

In [60]:
data.withColumn("Gender", lit('M')) \
  .show()

+---------------+--------------+-----------------+----------+------+
|     ip_address|       Country|      Domain Name|Bytes_used|Gender|
+---------------+--------------+-----------------+----------+------+
|  52.81.192.172|         China| odnoklassniki.ru|       463|     M|
| 119.239.207.13|         China|         youtu.be|        51|     M|
|  68.69.217.210|         China|        adobe.com|        10|     M|
|   7.191.21.223|      Bulgaria|     linkedin.com|       853|     M|
|   211.13.10.68|     Indonesia|          hud.gov|        29|     M|
|   239.80.21.97|      Suriname|       smh.com.au|       218|     M|
|106.214.106.233|       Jamaica|    amazonaws.com|        95|     M|
| 127.242.24.138|         China| surveymonkey.com|       123|     M|
|     99.2.6.139|Czech Republic|     geocities.jp|       322|     M|
|   237.54.11.63|         China|       amazon.com|        83|     M|
| 252.141.157.25|         Japan|      cornell.edu|       374|     M|
|185.220.128.248|       Belgium|  

In [61]:
data=spark.read.format('csv').\
option ('inferSchema','true').\
option ('header','true').\
option ('path','assignment 2 - Pyspark.csv').\
load()
data.printSchema()

root
 |-- ip_address: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Domain Name: string (nullable = true)
 |-- Bytes_used: integer (nullable = true)



In [62]:
data.select('Country').show()

+--------------+
|       Country|
+--------------+
|         China|
|         China|
|         China|
|      Bulgaria|
|     Indonesia|
|      Suriname|
|       Jamaica|
|         China|
|Czech Republic|
|         China|
|         Japan|
|       Belgium|
|   Afghanistan|
|     Indonesia|
|Czech Republic|
|     Indonesia|
|        Mexico|
|       Croatia|
|      Thailand|
|      Thailand|
+--------------+
only showing top 20 rows



In [63]:
import pyspark.sql.functions as sql
from pyspark.sql.functions import *
df=data.withColumn("Country1", when(data.Country== "Mexico" , "Yes").otherwise("No"))
df.show()

+---------------+--------------+-----------------+----------+--------+
|     ip_address|       Country|      Domain Name|Bytes_used|Country1|
+---------------+--------------+-----------------+----------+--------+
|  52.81.192.172|         China| odnoklassniki.ru|       463|      No|
| 119.239.207.13|         China|         youtu.be|        51|      No|
|  68.69.217.210|         China|        adobe.com|        10|      No|
|   7.191.21.223|      Bulgaria|     linkedin.com|       853|      No|
|   211.13.10.68|     Indonesia|          hud.gov|        29|      No|
|   239.80.21.97|      Suriname|       smh.com.au|       218|      No|
|106.214.106.233|       Jamaica|    amazonaws.com|        95|      No|
| 127.242.24.138|         China| surveymonkey.com|       123|      No|
|     99.2.6.139|Czech Republic|     geocities.jp|       322|      No|
|   237.54.11.63|         China|       amazon.com|        83|      No|
| 252.141.157.25|         Japan|      cornell.edu|       374|      No|
|185.2

In [64]:
df1 = df.groupBy('Country1').agg(sum('Bytes_used').alias('total_bytes'))

In [65]:
df1.show()

+--------+-----------+
|Country1|total_bytes|
+--------+-----------+
|      No|     508076|
|     Yes|       6293|
+--------+-----------+



In [66]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, countDistinct
import pyspark.sql.functions as sql
from pyspark.sql.functions import *

# Assuming you already have a SparkSession created and a DataFrame named 'df'
# spark = SparkSession.builder.appName("example").getOrCreate()
# df = spark.read...

df_result = df.groupBy('Country1').agg(countDistinct('ip_address').alias('distinct_ip_count'))

# Show or perform further actions on df_result as needed
df_result.show()



+--------+-----------------+
|Country1|distinct_ip_count|
+--------+-----------------+
|      No|              987|
|     Yes|               13|
+--------+-----------------+



In [67]:
df_result.orderBy(col('distinct_ip_count').desc())


DataFrame[Country1: string, distinct_ip_count: bigint]

In [68]:
df_result.show()

+--------+-----------------+
|Country1|distinct_ip_count|
+--------+-----------------+
|      No|              987|
|     Yes|               13|
+--------+-----------------+



In [69]:
import pandas as pd
import numpy as np

# user defined functions in Pyspark

In [84]:
from pyspark.sql import SparkSession
from pyspark import SparkContext

In [85]:
spark=SparkSession.builder.appName("DataFrame").getOrCreate()

In [86]:
data= spark.read.format('csv').\
option('inferSchema','true').\
option('header','true').\
option('path','file1.csv').\
load()
data.printSchema()

root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- salary: integer (nullable = true)
 |-- bonus: integer (nullable = true)



In [92]:
data.show()

+---+--------+------+-----+
| id|    name|salary|bonus|
+---+--------+------+-----+
|  1|manpreet| 50000|  200|
|  2|   payal| 80000|  300|
+---+--------+------+-----+



In [96]:
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType

def totpay(s, b):
    return s + b

TotalPayment = udf(lambda salary, bonus: totpay(salary, bonus), IntegerType())


In [None]:
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType
from pyspark.sql import SparkSession

# Assuming you already have a SparkSession created and a DataFrame named 'df'
# spark = SparkSession.builder.appName("example").getOrCreate()
# df = spark.read...

def totpay(s, b):
    return s + b

TotalPayment = udf(lambda salary, bonus: totpay(salary, bonus), IntegerType())

# Add a new column 'TotalPay' to the DataFrame
result = data.withColumn('TotalPay', TotalPayment(data.salary, data.bonus))

# Show the resulting DataFrame



In [None]:
df_pandas = result.toPandas()
print(df_pandas.head())