In [1]:
from pyspark.sql import *
import pandas as pd
import plotly.graph_objects as go
import plotly.express as px
import pyspark.sql.functions as F
from IPython.display import display
from pyspark.sql.functions import to_timestamp
import pytz
from pyspark.sql.functions import *
import plotly.subplots as sp


In [2]:
spark = SparkSession.builder.appName("test-app").getOrCreate()

23/06/12 08:43:20 WARN Utils: Your hostname, Divyams-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 10.151.172.44 instead (on interface en0)
23/06/12 08:43:20 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/06/12 08:43:21 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, DoubleType, IntegerType

# Define the explicit schema
schema = StructType([
    StructField("Timestamp", StringType(), nullable=True),
    StructField("Well_ID", StringType(), nullable=True),
    StructField("Stage", IntegerType(), nullable=True),
    StructField("Engine_Oil_Pressure", DoubleType(), nullable=True),
    StructField("Proppant_Concentration", DoubleType(), nullable=True),
    StructField("Fluid_Rate", DoubleType(), nullable=True),
])

# Read the CSV file with the specified schema
df = spark.read.csv("./TestData/oildata.csv", header=True, schema=schema)

# Create a temporary view
df.createOrReplaceTempView("data")

# Display the DataFrame
df.show()

+-------------------+--------+-----+-------------------+----------------------+----------+
|          Timestamp| Well_ID|Stage|Engine_Oil_Pressure|Proppant_Concentration|Fluid_Rate|
+-------------------+--------+-----+-------------------+----------------------+----------+
|2023-06-08 09:00:00|Well-001|    1|              100.0|                2000.0|     500.0|
|2023-06-08 09:30:00|Well-001|    1|              105.0|                2100.0|     550.0|
|2023-06-08 10:00:00|Well-001|    1|              102.0|                2200.0|     600.0|
|2023-06-08 10:30:00|Well-001|    2|               98.0|                2300.0|     620.0|
|2023-06-08 11:00:00|Well-001|    2|              102.0|                2400.0|     650.0|
|2023-06-08 11:30:00|Well-001|    2|              100.0|                2500.0|     680.0|
|2023-06-08 12:00:00|Well-001|    3|               95.0|                2600.0|     700.0|
|2023-06-08 12:30:00|Well-001|    3|               99.0|                2700.0|     730.0|

In [4]:
sql = """
SELECT Stage, AVG(Engine_Oil_Pressure) as avg_EOP, AVG(Fluid_Rate) as avg_FR
FROM data
GROUP BY Stage
"""
query = spark.sql(sql)
query.toPandas().sort_values(by=['Stage'])


Unnamed: 0,Stage,avg_EOP,avg_FR
0,1,100.933333,550.0
3,2,101.166667,650.0
1,3,99.9,730.0
2,4,100.9,810.0


In [5]:
result_df=df.select('Engine_Oil_Pressure', 'Proppant_Concentration', 'Fluid_Rate', 'Well_ID','Stage')
result_df=result_df.toPandas()
result_df['Combined_Value'] = (result_df['Engine_Oil_Pressure'] * result_df['Fluid_Rate']) / result_df['Proppant_Concentration'] #moles*psi/liter
result_pivot = result_df.pivot_table(index='Well_ID', columns='Stage', values='Combined_Value', aggfunc='mean')
fig = px.imshow(result_pivot)
display(fig)


ValueError: Mime type rendering requires nbformat>=4.2.0 but it is not installed

In [6]:
data = df.toPandas()

# Get unique well IDs
unique_well_ids = data['Well_ID'].unique()

# Create a dictionary to map each well ID to a color
color_map = {well_id: color for well_id, color in zip(unique_well_ids, px.colors.qualitative.Pastel)}

# Assign colors based on well ID
data['Color'] = data['Well_ID'].map(color_map)

fig = go.Figure()
# Iterate over each unique well ID
for well_id in unique_well_ids:
    well_data = data[data['Well_ID'] == well_id]

    scatter_trace = go.Scatter(
        x=well_data['Timestamp'],
        y=well_data['Fluid_Rate'],
        mode='lines',
        name=f'Fluid Rate ({well_id})'
    )
    fig.add_trace(scatter_trace)
# Create a bar plot for Oil Pressure
bar_trace = go.Bar(
    x=data['Timestamp'], 
    y=data['Engine_Oil_Pressure'], 
    name='Engine Oil Pressure', 
    yaxis='y2',
    marker=dict(color=data['Color'])
)

# Create the figure and layout
fig = go.Figure(data=[scatter_trace, bar_trace])
fig.update_layout(
    title='Well Data',
    xaxis_title='Timestamp',
    yaxis=dict(title='Fluid Rate'),
    yaxis2=dict(title='Engine Oil Pressure', overlaying='y', side='right'),
    legend=dict(x=1, y=1),
    showlegend=True
)

display(fig)

ValueError: Mime type rendering requires nbformat>=4.2.0 but it is not installed

In [7]:
fig= px.line(data, x='Timestamp', y='Engine_Oil_Pressure',color='Well_ID')
display(fig)


ValueError: Mime type rendering requires nbformat>=4.2.0 but it is not installed

In [11]:
sql = """
SELECT Stage, AVG(Engine_Oil_Pressure) as avg_EOP, AVG(Fluid_Rate) as avg_FR
FROM data
GROUP BY Stage
"""
query = spark.sql(sql)
data=query.toPandas().sort_values(by=['Stage'])
print(data)

fig = go.Figure()
scatter_trace1=go.Scatter(
    x=data['Stage'],
    y=data['avg_FR'],
    mode='markers',
    name='Fluid Rate and Stage',
)
scatter_trace2=go.Scatter(
    x=data['Stage'],
    y=data['avg_EOP'],
    mode='markers',
    name='Fluid Rate and Stage',

)
fig = go.Figure(data=[scatter_trace1, scatter_trace2])
fig.update_layout(
    title='Well Data',
    xaxis_title='Stage',
    yaxis=dict(title='Fluid Rate'),
    yaxis2=dict(title='Engine Oil Pressure', overlaying='y', side='right'),
    legend=dict(x=1, y=1),
    showlegend=True
)
display(fig)


   Stage     avg_EOP  avg_FR
0      1  100.933333   550.0
3      2  101.166667   650.0
1      3   99.900000   730.0
2      4  100.900000   810.0


ValueError: Mime type rendering requires nbformat>=4.2.0 but it is not installed