<a href="https://colab.research.google.com/github/andreidore/machine_learning/blob/main/pyspark/pyspark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://mirror.efect.ro/apache/spark/spark-2.4.7/spark-2.4.7-bin-hadoop2.7.tgz
!tar xf spark-2.4.7-bin-hadoop2.7.tgz
!pip install -q findspark

In [2]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.7-bin-hadoop2.7"

In [3]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

In [4]:
spark

In [5]:
df=spark.read.json("people.json")

In [6]:
df.printSchema()

root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)



In [7]:
df.columns

['age', 'name']

In [8]:
df.describe().show()

+-------+------------------+-------+
|summary|               age|   name|
+-------+------------------+-------+
|  count|                 2|      3|
|   mean|              24.5|   null|
| stddev|7.7781745930520225|   null|
|    min|                19|   Andy|
|    max|                30|Michael|
+-------+------------------+-------+



In [9]:
df.select("age").show()

+----+
| age|
+----+
|null|
|  30|
|  19|
+----+



In [10]:
df.withColumn("new_age",df["age"]*2).show()

+----+-------+-------+
| age|   name|new_age|
+----+-------+-------+
|null|Michael|   null|
|  30|   Andy|     60|
|  19| Justin|     38|
+----+-------+-------+



In [11]:
df.createOrReplaceTempView("people")

In [12]:
results=spark.sql("select * from people")
results.show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



In [13]:
results=spark.sql("select * from people where age=19")
results.show()

+---+------+
|age|  name|
+---+------+
| 19|Justin|
+---+------+



In [14]:
df=spark.read.csv("appl_stock.csv")

In [15]:
df.head(5)

[Row(_c0='Date', _c1='Open', _c2='High', _c3='Low', _c4='Close', _c5='Volume', _c6='Adj Close'),
 Row(_c0='2010-01-04', _c1='213.429998', _c2='214.499996', _c3='212.38000099999996', _c4='214.009998', _c5='123432400', _c6='27.727039'),
 Row(_c0='2010-01-05', _c1='214.599998', _c2='215.589994', _c3='213.249994', _c4='214.379993', _c5='150476200', _c6='27.774976000000002'),
 Row(_c0='2010-01-06', _c1='214.379993', _c2='215.23', _c3='210.750004', _c4='210.969995', _c5='138040000', _c6='27.333178000000004'),
 Row(_c0='2010-01-07', _c1='211.75', _c2='212.000006', _c3='209.050005', _c4='210.58', _c5='119282800', _c6='27.28265')]

In [16]:
df.show()

+----------+------------------+------------------+------------------+------------------+---------+------------------+
|       _c0|               _c1|               _c2|               _c3|               _c4|      _c5|               _c6|
+----------+------------------+------------------+------------------+------------------+---------+------------------+
|      Date|              Open|              High|               Low|             Close|   Volume|         Adj Close|
|2010-01-04|        213.429998|        214.499996|212.38000099999996|        214.009998|123432400|         27.727039|
|2010-01-05|        214.599998|        215.589994|        213.249994|        214.379993|150476200|27.774976000000002|
|2010-01-06|        214.379993|            215.23|        210.750004|        210.969995|138040000|27.333178000000004|
|2010-01-07|            211.75|        212.000006|        209.050005|            210.58|119282800|          27.28265|
|2010-01-08|        210.299994|        212.000006|209.06

In [17]:
df = spark.read.csv("appl_stock.csv",inferSchema=True,header=True)

In [18]:
df.printSchema()

root
 |-- Date: timestamp (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Adj Close: double (nullable = true)



In [19]:
df.head(3)[0]

Row(Date=datetime.datetime(2010, 1, 4, 0, 0), Open=213.429998, High=214.499996, Low=212.38000099999996, Close=214.009998, Volume=123432400, Adj Close=27.727039)

In [21]:
df.filter("Close<500").select("Open").show()

+------------------+
|              Open|
+------------------+
|        213.429998|
|        214.599998|
|        214.379993|
|            211.75|
|        210.299994|
|212.79999700000002|
|209.18999499999998|
|        207.870005|
|210.11000299999998|
|210.92999500000002|
|        208.330002|
|        214.910006|
|        212.079994|
|206.78000600000001|
|202.51000200000001|
|205.95000100000001|
|        206.849995|
|        204.930004|
|        201.079996|
|192.36999699999998|
+------------------+
only showing top 20 rows



In [23]:
df.filter(df["Close"]<500).select("Close").show()

+------------------+
|             Close|
+------------------+
|        214.009998|
|        214.379993|
|        210.969995|
|            210.58|
|211.98000499999998|
|210.11000299999998|
|        207.720001|
|        210.650002|
|            209.43|
|            205.93|
|        215.039995|
|            211.73|
|        208.069996|
|            197.75|
|        203.070002|
|        205.940001|
|        207.880005|
|        199.289995|
|        192.060003|
|        194.729998|
+------------------+
only showing top 20 rows



In [25]:
df.filter((df["Close"]<500) & (df["Close"]>300)).show()

+-------------------+------------------+------------------+------------------+------------------+---------+------------------+
|               Date|              Open|              High|               Low|             Close|   Volume|         Adj Close|
+-------------------+------------------+------------------+------------------+------------------+---------+------------------+
|2010-10-13 00:00:00|        300.200008|        301.959995|        299.799999|300.14000699999997|157523100|         38.886005|
|2010-10-14 00:00:00|        301.690002|        302.469994|        300.400013|302.30998999999997|108824100|         39.167147|
|2010-10-15 00:00:00|307.43998700000003|             315.0|        304.909996|314.73999399999997|230548500|         40.777572|
|2010-10-18 00:00:00|        318.470013|        319.000011|        314.289997|        317.999989|273252700|         41.199936|
|2010-10-19 00:00:00|303.40000200000003|        313.770012|300.02000400000003|309.48999399999997|308196000|40.0

In [26]:
df=spark.read.csv("sales_info.csv",inferSchema=True, header=True)

In [27]:
df.show()

+-------+-------+-----+
|Company| Person|Sales|
+-------+-------+-----+
|   GOOG|    Sam|200.0|
|   GOOG|Charlie|120.0|
|   GOOG|  Frank|340.0|
|   MSFT|   Tina|600.0|
|   MSFT|    Amy|124.0|
|   MSFT|Vanessa|243.0|
|     FB|   Carl|870.0|
|     FB|  Sarah|350.0|
|   APPL|   John|250.0|
|   APPL|  Linda|130.0|
|   APPL|   Mike|750.0|
|   APPL|  Chris|350.0|
+-------+-------+-----+

