In [5]:
from pyspark.sql import SparkSession


In [6]:
spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

In [16]:
# spark is an existing SparkSession
df = spark.read.json("C:/Users/Hemanth/Downloads/people.json")

In [17]:
# Displays the content of the DataFrame to stdout
df.show()

+---+----+--------------------+
|age|  id|                name|
+---+----+--------------------+
| 25|1201|              satish|
| 28|1202|<script>alert(657...|
| 39|1203|               amith|
| 23|1204|               javed|
| 23|1205|              prudvi|
| 23|1206|             Michael|
| 30|1207|                Andy|
| 19|1208|     CEHY104#&<(,+>;|
| 19|1208|               @@456|
| 19|1208|               @@456|
+---+----+--------------------+



In [18]:
# spark, df are from the previous example
# Print the schema in a tree format
df.printSchema()

root
 |-- age: string (nullable = true)
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)



In [19]:
# Select only the "name" column
df.select("name").show()

+--------------------+
|                name|
+--------------------+
|              satish|
|<script>alert(657...|
|               amith|
|               javed|
|              prudvi|
|             Michael|
|                Andy|
|     CEHY104#&<(,+>;|
|               @@456|
|               @@456|
+--------------------+



In [20]:
# Select everybody, but increment the age by 1
df.select(df['name'], df['age'] + 1).show()

+--------------------+---------+
|                name|(age + 1)|
+--------------------+---------+
|              satish|     26.0|
|<script>alert(657...|     29.0|
|               amith|     40.0|
|               javed|     24.0|
|              prudvi|     24.0|
|             Michael|     24.0|
|                Andy|     31.0|
|     CEHY104#&<(,+>;|     20.0|
|               @@456|     20.0|
|               @@456|     20.0|
+--------------------+---------+



In [21]:
# Select people older than 21
df.filter(df['age'] > 21).show()

+---+----+--------------------+
|age|  id|                name|
+---+----+--------------------+
| 25|1201|              satish|
| 28|1202|<script>alert(657...|
| 39|1203|               amith|
| 23|1204|               javed|
| 23|1205|              prudvi|
| 23|1206|             Michael|
| 30|1207|                Andy|
+---+----+--------------------+



In [22]:
# Count people by age
df.groupBy("age").count().show()

+---+-----+
|age|count|
+---+-----+
| 30|    1|
| 28|    1|
| 19|    3|
| 23|    3|
| 25|    1|
| 39|    1|
+---+-----+



In [23]:
# Register the DataFrame as a SQL temporary view
df.createOrReplaceTempView("people")

sqlDF = spark.sql("SELECT * FROM people")
sqlDF.show()

+---+----+--------------------+
|age|  id|                name|
+---+----+--------------------+
| 25|1201|              satish|
| 28|1202|<script>alert(657...|
| 39|1203|               amith|
| 23|1204|               javed|
| 23|1205|              prudvi|
| 23|1206|             Michael|
| 30|1207|                Andy|
| 19|1208|     CEHY104#&<(,+>;|
| 19|1208|               @@456|
| 19|1208|               @@456|
+---+----+--------------------+



In [24]:
# Register the DataFrame as a global temporary view
df.createGlobalTempView("people")

# Global temporary view is tied to a system preserved database `global_temp`
spark.sql("SELECT * FROM global_temp.people").show()

+---+----+--------------------+
|age|  id|                name|
+---+----+--------------------+
| 25|1201|              satish|
| 28|1202|<script>alert(657...|
| 39|1203|               amith|
| 23|1204|               javed|
| 23|1205|              prudvi|
| 23|1206|             Michael|
| 30|1207|                Andy|
| 19|1208|     CEHY104#&<(,+>;|
| 19|1208|               @@456|
| 19|1208|               @@456|
+---+----+--------------------+



In [25]:
# Global temporary view is cross-session
spark.newSession().sql("SELECT * FROM global_temp.people").show()

+---+----+--------------------+
|age|  id|                name|
+---+----+--------------------+
| 25|1201|              satish|
| 28|1202|<script>alert(657...|
| 39|1203|               amith|
| 23|1204|               javed|
| 23|1205|              prudvi|
| 23|1206|             Michael|
| 30|1207|                Andy|
| 19|1208|     CEHY104#&<(,+>;|
| 19|1208|               @@456|
| 19|1208|               @@456|
+---+----+--------------------+



In [21]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StringType

In [3]:
spark = SparkSession.builder.appName('Python Spark SQL basic example').config('spark.some.config','some-value').getOrCreate()

In [5]:
df = spark.read.csv('C:/Users/Hemanth/Downloads/real_state (1).csv')

In [6]:
df.printSchema

<bound method DataFrame.printSchema of DataFrame[_c0: string, _c1: string, _c2: string, _c3: string, _c4: string, _c5: string, _c6: string, _c7: string, _c8: string]>

In [8]:
df.show()

+--------------------+--------------+-----+-----+----+-----+------+-----------+------+
|                 _c0|           _c1|  _c2|  _c3| _c4|  _c5|   _c6|        _c7|   _c8|
+--------------------+--------------+-----+-----+----+-----+------+-----------+------+
|              street|          city|  zip|state|beds|baths|sq__ft|       type| price|
|        3526 HIGH ST|    SACRAMENTO|95838|   CA|   2|    1|   836|Residential| 59222|
|         51 OMAHA CT|    SACRAMENTO|95823|   CA|   3|    1|  1167|Residential| 68212|
|      2796 BRANCH ST|    SACRAMENTO|95815|   CA|   2|    1|   796|Residential| 68880|
|    2805 JANETTE WAY|    SACRAMENTO|95815|   CA|   2|    1|   852|Residential| 69307|
|     6001 MCMAHON DR|    SACRAMENTO|95824|   CA|   2|    1|   797|Residential| 81900|
|  5828 PEPPERMILL CT|    SACRAMENTO|95841|   CA|   3|    1|  1122|      Condo| 89921|
| 6048 OGDEN NASH WAY|    SACRAMENTO|95842|   CA|   3|    2|  1104|Residential| 90895|
|       2561 19TH AVE|    SACRAMENTO|95820|

In [22]:
Schema = StructType().add("Street",StringType(),True).add("City",StringType(),True).add("State",StringType(),True).add("Zip",StringType(),True).add("Beds",StringType(),True).add("Baths",StringType(),True).add("Sq_feet",StringType(),True).add("flat_type",StringType(),True).add("Price",StringType(),True)

In [23]:
df_with_schema = spark.read.format("csv").option("header", "true").load("C:/Users/Hemanth/Downloads/real_state (1).csv")
#.schema(schema)

In [24]:
df2 = spark.read.option("header",True).csv("C:/Users/Hemanth/Downloads/real_state (1).csv")

In [25]:
df2.printSchema()

root
 |-- street: string (nullable = true)
 |-- city: string (nullable = true)
 |-- zip: string (nullable = true)
 |-- state: string (nullable = true)
 |-- beds: string (nullable = true)
 |-- baths: string (nullable = true)
 |-- sq__ft: string (nullable = true)
 |-- type: string (nullable = true)
 |-- price: string (nullable = true)



In [26]:
df2.show()

+--------------------+--------------+-----+-----+----+-----+------+-----------+------+
|              street|          city|  zip|state|beds|baths|sq__ft|       type| price|
+--------------------+--------------+-----+-----+----+-----+------+-----------+------+
|        3526 HIGH ST|    SACRAMENTO|95838|   CA|   2|    1|   836|Residential| 59222|
|         51 OMAHA CT|    SACRAMENTO|95823|   CA|   3|    1|  1167|Residential| 68212|
|      2796 BRANCH ST|    SACRAMENTO|95815|   CA|   2|    1|   796|Residential| 68880|
|    2805 JANETTE WAY|    SACRAMENTO|95815|   CA|   2|    1|   852|Residential| 69307|
|     6001 MCMAHON DR|    SACRAMENTO|95824|   CA|   2|    1|   797|Residential| 81900|
|  5828 PEPPERMILL CT|    SACRAMENTO|95841|   CA|   3|    1|  1122|      Condo| 89921|
| 6048 OGDEN NASH WAY|    SACRAMENTO|95842|   CA|   3|    2|  1104|Residential| 90895|
|       2561 19TH AVE|    SACRAMENTO|95820|   CA|   3|    1|  1177|Residential| 91002|
|11150 TRINITY RIV...|RANCHO CORDOVA|95670|

 # Aggregate functions

In [5]:
from pyspark.sql import SparkSession

In [6]:
spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

In [7]:
simpleData = [("James", "Sales", 3000),
    ("Michael", "Sales", 4600),
    ("Robert", "Sales", 4100),
    ("Maria", "Finance", 3000),
    ("James", "Sales", 3000),
    ("Scott", "Finance", 3300),
    ("Jen", "Finance", 3900),
    ("Jeff", "Marketing", 3000),
    ("Kumar", "Marketing", 2000),
    ("Saif", "Sales", 4100)
  ]

schema = ["employee_name","department","salary"]
df = spark.createDataFrame(data=simpleData,schema=schema)
df.printSchema()


root
 |-- employee_name: string (nullable = true)
 |-- department: string (nullable = true)
 |-- salary: long (nullable = true)



In [11]:
df.show()

Py4JJavaError: An error occurred while calling o47.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2.0 failed 1 times, most recent failure: Lost task 0.0 in stage 2.0 (TID 2) (Hemanth.mshome.net executor driver): org.apache.spark.SparkException: Python worker failed to connect back.
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:188)
	at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:108)
	at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:121)
	at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:162)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:65)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1491)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	at java.lang.Thread.run(Unknown Source)
Caused by: java.net.SocketTimeoutException: Accept timed out
	at java.net.DualStackPlainSocketImpl.waitForNewConnection(Native Method)
	at java.net.DualStackPlainSocketImpl.socketAccept(Unknown Source)
	at java.net.AbstractPlainSocketImpl.accept(Unknown Source)
	at java.net.PlainSocketImpl.accept(Unknown Source)
	at java.net.ServerSocket.implAccept(Unknown Source)
	at java.net.ServerSocket.accept(Unknown Source)
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:175)
	... 29 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2454)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2403)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2402)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2402)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1160)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1160)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1160)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2642)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2584)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2573)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:938)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2214)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2235)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2254)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:492)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:445)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:48)
	at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3715)
	at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:2728)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3706)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3704)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:2728)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:2935)
	at org.apache.spark.sql.Dataset.getRows(Dataset.scala:287)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:326)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
	at java.lang.reflect.Method.invoke(Unknown Source)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Unknown Source)
Caused by: org.apache.spark.SparkException: Python worker failed to connect back.
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:188)
	at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:108)
	at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:121)
	at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:162)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:65)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1491)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	... 1 more
Caused by: java.net.SocketTimeoutException: Accept timed out
	at java.net.DualStackPlainSocketImpl.waitForNewConnection(Native Method)
	at java.net.DualStackPlainSocketImpl.socketAccept(Unknown Source)
	at java.net.AbstractPlainSocketImpl.accept(Unknown Source)
	at java.net.PlainSocketImpl.accept(Unknown Source)
	at java.net.ServerSocket.implAccept(Unknown Source)
	at java.net.ServerSocket.accept(Unknown Source)
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:175)
	... 29 more
