# Project Name - Stock Prices

### The goal is to research stock prices

#### Data is located at the link /kaggle/input/usstockprices/stocks_price_final.csv

In [17]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

/kaggle/input/usstockprices/stocks_price_final.csv


In [18]:
!pip install pyspark
!pip install pyarrow



In [19]:
from pyspark.sql import SparkSession
import pandas as pd

spark = SparkSession.builder\
        .master("local[*]")\
        .appName('PySpark_Tutorial')\
        .getOrCreate()

In [20]:
# read data structure
path = '/kaggle/input/usstockprices/stocks_price_final.csv'
df = spark.read.csv(
    path,
    sep=',',
    header=True,
)

df.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- symbol: string (nullable = true)
 |-- date: string (nullable = true)
 |-- open: string (nullable = true)
 |-- high: string (nullable = true)
 |-- low: string (nullable = true)
 |-- close: string (nullable = true)
 |-- volume: string (nullable = true)
 |-- adjusted: string (nullable = true)
 |-- market.cap: string (nullable = true)
 |-- sector: string (nullable = true)
 |-- industry: string (nullable = true)
 |-- exchange: string (nullable = true)



In [21]:
# rename 'market.cap'
df = df.withColumnRenamed('market.cap', 'market_cap')
df.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- symbol: string (nullable = true)
 |-- date: string (nullable = true)
 |-- open: string (nullable = true)
 |-- high: string (nullable = true)
 |-- low: string (nullable = true)
 |-- close: string (nullable = true)
 |-- volume: string (nullable = true)
 |-- adjusted: string (nullable = true)
 |-- market_cap: string (nullable = true)
 |-- sector: string (nullable = true)
 |-- industry: string (nullable = true)
 |-- exchange: string (nullable = true)



In [22]:
# read df
df.toPandas()

23/12/18 16:21:23 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , symbol, date, open, high, low, close, volume, adjusted, market.cap, sector, industry, exchange
 Schema: _c0, symbol, date, open, high, low, close, volume, adjusted, market.cap, sector, industry, exchange
Expected: _c0 but found: 
CSV file: file:///kaggle/input/usstockprices/stocks_price_final.csv
                                                                                

Unnamed: 0,_c0,symbol,date,open,high,low,close,volume,adjusted,market_cap,sector,industry,exchange
0,1,TXG,2019-09-12,54,58,51,52.75,7326300,52.75,$9.31B,Capital Goods,Biotechnology: Laboratory Analytical Instruments,NASDAQ
1,2,TXG,2019-09-13,52.75,54.355,49.150002,52.27,1025200,52.27,$9.31B,Capital Goods,Biotechnology: Laboratory Analytical Instruments,NASDAQ
2,3,TXG,2019-09-16,52.450001,56,52.009998,55.200001,269900,55.200001,$9.31B,Capital Goods,Biotechnology: Laboratory Analytical Instruments,NASDAQ
3,4,TXG,2019-09-17,56.209999,60.900002,55.423,56.779999,602800,56.779999,$9.31B,Capital Goods,Biotechnology: Laboratory Analytical Instruments,NASDAQ
4,5,TXG,2019-09-18,56.849998,62.27,55.650002,62,1589600,62,$9.31B,Capital Goods,Biotechnology: Laboratory Analytical Instruments,NASDAQ
...,...,...,...,...,...,...,...,...,...,...,...,...,...
1729029,1729030,ZYME,2020-07-16,30.57,31.67,30.299999,31.15,467900,31.15,$1.44B,Health Care,Major Pharmaceuticals,NYSE
1729030,1729031,ZYME,2020-07-17,31.200001,33.080002,31,33.029999,600800,33.029999,$1.44B,Health Care,Major Pharmaceuticals,NYSE
1729031,1729032,ZYME,2020-07-20,33.32,33.32,31.589001,32.110001,303500,32.110001,$1.44B,Health Care,Major Pharmaceuticals,NYSE
1729032,1729033,ZYME,2020-07-21,32.369999,32.490002,30.34,30.65,337900,30.65,$1.44B,Health Care,Major Pharmaceuticals,NYSE


In [23]:
# structing our data
from pyspark.sql.types import *

df_schema = [
               StructField('_c0', IntegerType(), True),
               StructField('symbol', StringType(), True),
               StructField('data', DateType(), True),
               StructField('open', DoubleType(), True),
               StructField('high', DoubleType(), True),
               StructField('low', DoubleType(), True),
               StructField('close', DoubleType(), True),
               StructField('volume', IntegerType(), True),
               StructField('adjusted', DoubleType(), True),
               StructField('market_cap', StringType(), True),
               StructField('sector', StringType(), True),
               StructField('industry', StringType(), True),
               StructField('exchange', StringType(), True),
            ]

final_struc = StructType(fields = df_schema)

df = spark.read.csv(
    path,
    sep=',',
    header=True,
    schema=final_struc 
)

df.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- symbol: string (nullable = true)
 |-- data: date (nullable = true)
 |-- open: double (nullable = true)
 |-- high: double (nullable = true)
 |-- low: double (nullable = true)
 |-- close: double (nullable = true)
 |-- volume: integer (nullable = true)
 |-- adjusted: double (nullable = true)
 |-- market_cap: string (nullable = true)
 |-- sector: string (nullable = true)
 |-- industry: string (nullable = true)
 |-- exchange: string (nullable = true)



Various data verification methods:
schema, dtypes, show, head, first, take, describe, columns, count, distinct, printSchema

In [24]:
# statistic information
import pyspark.sql.functions as f
num_col = [f.name for f in df.schema.fields if isinstance(f.dataType,
                                                                  DoubleType) and
                  isinstance(f.dataType,IntegerType)]

df.describe(num_col).toPandas()

23/12/18 16:22:04 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
23/12/18 16:22:04 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , symbol, open, high, low, close, volume, adjusted, market.cap, sector, industry, exchange
 Schema: _c0, symbol, open, high, low, close, volume, adjusted, market_cap, sector, industry, exchange
Expected: _c0 but found: 
CSV file: file:///kaggle/input/usstockprices/stocks_price_final.csv
                                                                                

Unnamed: 0,summary,_c0,symbol,open,high,low,close,volume,adjusted,market_cap,sector,industry,exchange
0,count,1729034.0,1729034,1726301.0,1726301.0,1726301.0,1726301.0,1725207.0,1726301.0,1729034,1729034,1729034,1729034
1,mean,864517.5,,15070.071703341053,15555.067268137089,14557.808227578991,15032.714854330708,1397692.162788581,14926.1096887955,,,,
2,stddev,499129.2670065542,,1111821.8002863151,1148247.1953514942,1072968.155843425,1109755.929400066,5187522.908169128,1101877.6328940208,,,,
3,min,1.0,A,0.072,0.078,0.052,0.071,0.0,-1.230099,$1.01B,Basic Industries,Accident &Health Insurance,NASDAQ
4,max,1729034.0,ZYXI,160168176.0,161601456.0,155151728.0,158376592.0,656504200.0,157249392.0,$9B,Transportation,Wholesale Distributors,NYSE


In [25]:
# max and min for 'data'

df.agg(f.max(df.data), f.min(df.data)).show()

23/12/18 16:22:30 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: date
 Schema: data
Expected: data but found: date
CSV file: file:///kaggle/input/usstockprices/stocks_price_final.csv

+----------+----------+
| max(data)| min(data)|
+----------+----------+
|2020-07-22|2019-01-02|
+----------+----------+



                                                                                

In [26]:
from pyspark.sql.functions import col, count, isnan, when, isnull, replace, mean, lit
# inspection for pass
columns = [f.name for f in df.schema.fields]
#df = spark.createDataFrame(df, columns)
missing_values_count = df.select([count(when(isnull(c) | (col(c) == ''), c)).alias(c) for c in df.columns])

missing_values_count.show()                                  

23/12/18 16:22:34 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , symbol, date, open, high, low, close, volume, adjusted, market.cap, sector, industry, exchange
 Schema: _c0, symbol, data, open, high, low, close, volume, adjusted, market_cap, sector, industry, exchange
Expected: _c0 but found: 
CSV file: file:///kaggle/input/usstockprices/stocks_price_final.csv

+---+------+----+----+----+----+-----+------+--------+----------+------+--------+--------+
|_c0|symbol|data|open|high| low|close|volume|adjusted|market_cap|sector|industry|exchange|
+---+------+----+----+----+----+-----+------+--------+----------+------+--------+--------+
|  0|     0|   0|2733|2733|2733| 2733|  3827|    2733|         0|     0|       0|       0|
+---+------+----+----+----+----+-----+------+--------+----------+------+--------+--------+



                                                                                

In [27]:
# replace NaN to mean for 'open'
column_to_fill = 'open'
mean_value_open = df.select(f.mean(df[column_to_fill])).collect()[0][0]
df_filled = df.fillna(mean_value_open, subset=[column_to_fill])
missing_values_count = df_filled.select([f.count(f.when(f.isnull(c) | (f.col(c) == ''), c)).alias(c) for c in df_filled.columns])
missing_values_count.show()

23/12/18 16:22:41 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , symbol, date, high, low, close, volume, adjusted, market.cap, sector, industry, exchange
 Schema: _c0, symbol, data, high, low, close, volume, adjusted, market_cap, sector, industry, exchange
Expected: _c0 but found: 
CSV file: file:///kaggle/input/usstockprices/stocks_price_final.csv

+---+------+----+----+----+----+-----+------+--------+----------+------+--------+--------+
|_c0|symbol|data|open|high| low|close|volume|adjusted|market_cap|sector|industry|exchange|
+---+------+----+----+----+----+-----+------+--------+----------+------+--------+--------+
|  0|     0|   0|   0|2733|2733| 2733|  3827|    2733|         0|     0|       0|       0|
+---+------+----+----+----+----+-----+------+--------+----------+------+--------+--------+



                                                                                

In [28]:
# price information for January 2020
df.filter((col('data')>= lit('2020-01-01')) & (col('data')<=lit('2020-01-31'))
         ).toPandas()

23/12/18 16:22:45 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , symbol, date, open, high, low, close, volume, adjusted, market.cap, sector, industry, exchange
 Schema: _c0, symbol, data, open, high, low, close, volume, adjusted, market_cap, sector, industry, exchange
Expected: _c0 but found: 
CSV file: file:///kaggle/input/usstockprices/stocks_price_final.csv
                                                                                

Unnamed: 0,_c0,symbol,data,open,high,low,close,volume,adjusted,market_cap,sector,industry,exchange
0,78,TXG,2020-01-02,76.910004,77.989998,71.480003,72.830002,220200.0,72.830002,$9.31B,Capital Goods,Biotechnology: Laboratory Analytical Instruments,NASDAQ
1,79,TXG,2020-01-03,71.519997,76.188004,70.580002,75.559998,288300.0,75.559998,$9.31B,Capital Goods,Biotechnology: Laboratory Analytical Instruments,NASDAQ
2,80,TXG,2020-01-06,75.269997,77.349998,73.559998,75.550003,220600.0,75.550003,$9.31B,Capital Goods,Biotechnology: Laboratory Analytical Instruments,NASDAQ
3,81,TXG,2020-01-07,76.000000,77.279999,75.320000,75.980003,182400.0,75.980003,$9.31B,Capital Goods,Biotechnology: Laboratory Analytical Instruments,NASDAQ
4,82,TXG,2020-01-08,76.089996,76.949997,72.739998,74.839996,172100.0,74.839996,$9.31B,Capital Goods,Biotechnology: Laboratory Analytical Instruments,NASDAQ
...,...,...,...,...,...,...,...,...,...,...,...,...,...
94612,1728911,ZYME,2020-01-27,46.419998,47.619999,43.860001,44.099998,530000.0,44.099998,$1.44B,Health Care,Major Pharmaceuticals,NYSE
94613,1728912,ZYME,2020-01-28,44.040001,44.880001,42.654999,43.540001,351800.0,43.540001,$1.44B,Health Care,Major Pharmaceuticals,NYSE
94614,1728913,ZYME,2020-01-29,44.540001,47.500000,44.160000,47.259998,380800.0,47.259998,$1.44B,Health Care,Major Pharmaceuticals,NYSE
94615,1728914,ZYME,2020-01-30,47.200001,47.259998,44.000000,44.270000,451600.0,44.270000,$1.44B,Health Care,Major Pharmaceuticals,NYSE


In [29]:
# slice 'adjusted' betweeen 100 and 500
df.filter(df.adjusted.between(100.0, 500.0)).toPandas()

23/12/18 16:22:50 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , symbol, date, open, high, low, close, volume, adjusted, market.cap, sector, industry, exchange
 Schema: _c0, symbol, data, open, high, low, close, volume, adjusted, market_cap, sector, industry, exchange
Expected: _c0 but found: 
CSV file: file:///kaggle/input/usstockprices/stocks_price_final.csv
                                                                                

Unnamed: 0,_c0,symbol,data,open,high,low,close,volume,adjusted,market_cap,sector,industry,exchange
0,93,TXG,2020-01-24,95.459999,101.000000,94.157997,100.790001,328100.0,100.790001,$9.31B,Capital Goods,Biotechnology: Laboratory Analytical Instruments,NASDAQ
1,94,TXG,2020-01-27,99.760002,104.892998,97.019997,103.209999,334900.0,103.209999,$9.31B,Capital Goods,Biotechnology: Laboratory Analytical Instruments,NASDAQ
2,95,TXG,2020-01-28,104.620003,108.269997,103.297997,106.620003,245400.0,106.620003,$9.31B,Capital Goods,Biotechnology: Laboratory Analytical Instruments,NASDAQ
3,6893,ABMD,2019-01-02,315.940002,320.709991,307.029999,309.959991,590000.0,309.959991,$13.39B,Health Care,Medical/Dental Instruments,NASDAQ
4,6894,ABMD,2019-01-03,307.250000,311.739990,293.660004,302.290009,665300.0,302.290009,$13.39B,Health Care,Medical/Dental Instruments,NASDAQ
...,...,...,...,...,...,...,...,...,...,...,...,...,...
142627,1727854,ZTS,2020-07-16,142.600006,142.960007,141.470001,142.940002,942000.0,142.940002,$68.92B,Health Care,Major Pharmaceuticals,NYSE
142628,1727855,ZTS,2020-07-17,143.800003,145.279999,142.229996,144.660004,1518100.0,144.660004,$68.92B,Health Care,Major Pharmaceuticals,NYSE
142629,1727856,ZTS,2020-07-20,145.089996,146.690002,144.320007,145.970001,1173700.0,145.970001,$68.92B,Health Care,Major Pharmaceuticals,NYSE
142630,1727857,ZTS,2020-07-21,146.500000,147.000000,144.169998,145.009995,1659600.0,145.009995,$68.92B,Health Care,Major Pharmaceuticals,NYSE


In [30]:
# creating of conditions for 'adjust'
df_fill = df.select('open', 'close', 
                    f.when(df.adjusted >= 200.0, 1).otherwise(0).
                    alias('adjust')
                   )
df_fill.toPandas()

                                                                                

Unnamed: 0,open,close,adjust
0,54.000000,52.750000,0
1,52.750000,52.270000,0
2,52.450001,55.200001,0
3,56.209999,56.779999,0
4,56.849998,62.000000,0
...,...,...,...
1729029,30.570000,31.150000,0
1729030,31.200001,33.029999,0
1729031,33.320000,32.110001,0
1729032,32.369999,30.650000,0


In [31]:
# fillter for 'adjust' >=200, =1, another = 0
choice = 0 # put 1 or 0
df_fill_1 = df_fill.filter(col('adjust')==choice).show()

+---------+---------+------+
|     open|    close|adjust|
+---------+---------+------+
|     54.0|    52.75|     0|
|    52.75|    52.27|     0|
|52.450001|55.200001|     0|
|56.209999|56.779999|     0|
|56.849998|     62.0|     0|
|62.810001|61.119999|     0|
|61.709999|     60.5|     0|
|60.220001|60.330002|     0|
|     61.0|54.299999|     0|
|54.459999|52.759998|     0|
|52.779999|49.990002|     0|
|51.130001|51.029999|     0|
|51.049999|50.400002|     0|
|50.509998|47.029999|     0|
|46.779999|    46.07|     0|
|    46.77|48.119999|     0|
|     48.0|51.450001|     0|
|52.099998|50.360001|     0|
|     50.0|49.549999|     0|
|49.630001|50.009998|     0|
+---------+---------+------+
only showing top 20 rows



In [32]:
spark.stop()