In [85]:
import pandas as pd

from multiprocessing import Pool, cpu_count
import pickle
from tqdm import tqdm

samples =pd.DataFrame.from_dict(pickle.load(open("samples.pickle", "rb")))
tweets = pickle.load(open("tweets.pickle", "rb"))

In [83]:
from sqlalchemy import create_engine, Table, Column, Integer, String, MetaData, func, select, and_, text
from dotenv import load_dotenv
import os

load_dotenv()
conn = create_engine(
    f"postgresql://{os.getenv('PG_USER')}:{os.getenv('PG_PASSWORD')}@{os.getenv('PG_HOST')}/{os.getenv('PG_DBNAME')}"
)
def rolling_samples(start_day, series, window=7):
    output = []
    for i in range(window, len(series)):
        sample = series[i - window : i].tolist()
        output.append((sample, i+start_day))
    return output

print("Temp starting")
temp_readings = pd.read_sql_query(text("SELECT * FROM public.temp_readings_production WHERE day > 1460"), conn)
start_day = temp_readings.iloc[0]["day"]
print("Temp done")
samples = []
for name, group in tqdm(temp_readings.groupby("xy")):
    group = group.sort_values("day")
    temps = rolling_samples(start_day, group["temperature"].set_axis(group["day"]))
    
    for temp, i in temps:
        samples.append({
            "day": i,
            "xy": name,
            "temp": temp
        })
print("Sorting samples")
samples = sorted(samples, key=lambda x: x["day"])
print("Sorting dome")
with open("samples.pickle", "wb") as f:
    pickle.dump(samples, f)

print("Loading tweets (dont crash plz)")
tweets = pd.read_sql_query(text("SELECT * FROM public.tweets_production WHERE day > 1460"), conn)
with open("tweets.pickle", "wb") as f:
    pickle.dump(tweets, f)
print("Sampling tweets")

# samples = pickle.load(open("samples.pickle", "rb"))
# tweets = pickle.load(open("tweets.pickle", "rb"))

Temp starting
Temp done


100%|████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 900/900 [00:23<00:00, 38.84it/s]


Sorting samples
Sorting dome
Loading tweets (dont crash plz)
Sampling tweets


In [84]:
samples

[{'day': 2490, 'xy': '(0,0)', 'temp': [44, 41, 51, 52, 50, 57, 47]},
 {'day': 2490, 'xy': '(0,1)', 'temp': [44, 54, 51, 51, 68, 52, 43]},
 {'day': 2490, 'xy': '(0,10)', 'temp': [52, 50, 55, 45, 47, 59, 53]},
 {'day': 2490, 'xy': '(0,11)', 'temp': [44, 42, 52, 50, 52, 60, 50]},
 {'day': 2490, 'xy': '(0,12)', 'temp': [44, 55, 44, 71, 66, 63, 45]},
 {'day': 2490, 'xy': '(0,13)', 'temp': [43, 39, 56, 51, 47, 57, 49]},
 {'day': 2490, 'xy': '(0,14)', 'temp': [50, 45, 49, 65, 60, 57, 59]},
 {'day': 2490, 'xy': '(0,15)', 'temp': [43, 68, 57, 80, 38, 55, 51]},
 {'day': 2490, 'xy': '(0,16)', 'temp': [49, 54, 37, 56, 56, 66, 65]},
 {'day': 2490, 'xy': '(0,17)', 'temp': [48, 35, 50, 47, 55, 54, 66]},
 {'day': 2490, 'xy': '(0,18)', 'temp': [57, 40, 46, 51, 66, 60, 53]},
 {'day': 2490, 'xy': '(0,19)', 'temp': [56, 53, 52, 57, 63, 47, 55]},
 {'day': 2490, 'xy': '(0,2)', 'temp': [50, 54, 51, 53, 45, 50, 52]},
 {'day': 2490, 'xy': '(0,20)', 'temp': [61, 51, 62, 60, 59, 51, 53]},
 {'day': 2490, 'xy': '(

In [6]:
tqdm.pandas()

In [91]:
merged_data = pd.merge(samples, tweets, how='left', on=['xy', 'day']).drop(["id", "score"], axis=1).fillna("")

In [92]:
 def aggregate_content(group):
        group['tweets'] = group['content'].tolist()
        # print(group['content'].tolist())
        return group.iloc[0]

In [93]:
merged_data[~merged_data["content"].isna()]

Unnamed: 0,day,xy,temp,content
0,2490,"(0,0)","[44, 41, 51, 52, 50, 57, 47]",
1,2490,"(0,1)","[44, 54, 51, 51, 68, 52, 43]",see me as governor now appeared as another per...
2,2490,"(0,1)","[44, 54, 51, 51, 68, 52, 43]",on her back one morning after having bathed hi...
3,2490,"(0,1)","[44, 54, 51, 51, 68, 52, 43]",gutenberg works you provide in accordance with...
4,2490,"(0,10)","[52, 50, 55, 45, 47, 59, 53]",rejoice at the appearance of the vegetables wh...
...,...,...,...,...
1002394,3571,"(9,5)","[47, 57, 39, 33, 35, 46, 38]",
1002395,3571,"(9,6)","[41, 28, 35, 41, 53, 43, 37]",
1002396,3571,"(9,7)","[38, 40, 49, 48, 42, 40, 36]",
1002397,3571,"(9,8)","[50, 37, 39, 43, 41, 53, 49]",


In [94]:
complete_samples = merged_data.groupby(['xy', 'day'], group_keys=False).progress_apply(aggregate_content)

100%|████████████████████████████████████████████████████████████████████████████████████████████████████████████| 973800/973800 [04:07<00:00, 3929.46it/s]


In [123]:
complete_samples =merged_data.groupby(['xy', 'day']).agg(
    # include other columns as needed
    tweets=pd.NamedAgg(column='content', aggfunc=list),
    temperature=pd.NamedAgg(column='temp', aggfunc='first')
).reset_index()

In [113]:
complete_samples[complete_samples['tweets'].apply(len) > 1].iloc[0].tweets

['their prisoners but it was impossible to make them understand anything nothing they could say to them or give them or do for them but was looked upon as going to murder them they first of all unbound them but',
 'of planks and were painted and what further convinced me was that found in it capital gun loaded',
 'heart that he should show himself so careless for if could see this radiance might it not reach',
 'and his importunity prevailed and engaged me to go in his ship as private trader to the east indies this was in the year 1694 in this voyage visited my',
 'said he be pleased to give me leave to lay down few propositions as the foundation of what have to say that we may not differ in the general principles though we may be of some differing']

In [139]:
complete_samples

Unnamed: 0,xy,day,tweets,temperature
0,"(0,0)",2490,[],"[44, 41, 51, 52, 50, 57, 47]"
1,"(0,0)",2491,[],"[41, 51, 52, 50, 57, 47, 39]"
2,"(0,0)",2492,[],"[51, 52, 50, 57, 47, 39, 52]"
3,"(0,0)",2493,[],"[52, 50, 57, 47, 39, 52, 62]"
4,"(0,0)",2494,[],"[50, 57, 47, 39, 52, 62, 71]"
...,...,...,...,...
973795,"(9,9)",3567,[],"[45, 45, 35, 20, 42, 33, 40]"
973796,"(9,9)",3568,[],"[45, 35, 20, 42, 33, 40, 47]"
973797,"(9,9)",3569,[],"[35, 20, 42, 33, 40, 47, 43]"
973798,"(9,9)",3570,[],"[20, 42, 33, 40, 47, 43, 30]"


In [125]:
with open("complete_samples.pickle", "wb") as f:
    pickle.dump(complete_samples, f)

In [140]:
complete_samples = complete_samples.to_dict(orient = 'records')


In [126]:
for i in orient:
    if i["tweets"] != ['']:
        print(i)
        break
        

{'xy': '(0,0)', 'day': 2496, 'tweets': ['their prisoners but it was impossible to make them understand anything nothing they could say to them or give them or do for them but was looked upon as going to murder them they first of all unbound them but', 'of planks and were painted and what further convinced me was that found in it capital gun loaded', 'heart that he should show himself so careless for if could see this radiance might it not reach', 'and his importunity prevailed and engaged me to go in his ship as private trader to the east indies this was in the year 1694 in this voyage visited my', 'said he be pleased to give me leave to lay down few propositions as the foundation of what have to say that we may not differ in the general principles though we may be of some differing'], 'temperature': [47, 39, 52, 62, 71, 43, 69]}


In [163]:
set(result.keys())

{'2617((2,20))',
 '3359((19,28))',
 '2914((2,25))',
 '3030((22,11))',
 '2922((29,3))',
 '3018((3,14))',
 '3268((0,3))',
 '2548((5,4))',
 '3160((18,22))',
 '3090((0,2))',
 '3398((18,7))',
 '2772((15,24))',
 '2665((20,4))',
 '3033((8,12))',
 '2885((29,24))',
 '2663((11,0))',
 '3557((6,12))',
 '2641((22,13))',
 '2751((19,20))',
 '3286((20,22))',
 '2627((20,21))',
 '3164((23,8))',
 '3096((24,5))',
 '3096((9,15))',
 '2616((28,26))',
 '3020((19,26))',
 '2623((10,15))',
 '3176((14,27))',
 '2855((16,20))',
 '3564((22,18))',
 '2532((11,6))',
 '3534((25,28))',
 '3103((14,26))',
 '3268((0,7))',
 '2767((18,10))',
 '3315((18,1))',
 '2684((11,28))',
 '3346((25,6))',
 '3570((29,24))',
 '2995((24,5))',
 '3428((15,10))',
 '2624((28,17))',
 '3119((14,13))',
 '3327((23,17))',
 '3021((29,13))',
 '3029((7,14))',
 '2546((23,10))',
 '3121((18,21))',
 '2911((23,0))',
 '2841((0,3))',
 '2925((9,17))',
 '3147((0,6))',
 '3083((28,28))',
 '2711((0,16))',
 '2723((27,22))',
 '2648((0,8))',
 '2825((4,15))',
 '2915((1

In [145]:
def dict_to_example(sample):
    # print(sample)
    return (sample["tweets"], sample["temperature"])

In [160]:
test_samples = complete_samples[:1000]
# print(test_samples[0])
result = {}
for sample in test_samples:
    key = f"{sample['day']}({sample['xy']})"
    items = [item for sublist in map(dict_to_example, [sample]) for item in sublist]
    result[key] = items
result

{'2490((0,0))': [[''], [44, 41, 51, 52, 50, 57, 47]],
 '2491((0,0))': [[''], [41, 51, 52, 50, 57, 47, 39]],
 '2492((0,0))': [[''], [51, 52, 50, 57, 47, 39, 52]],
 '2493((0,0))': [[''], [52, 50, 57, 47, 39, 52, 62]],
 '2494((0,0))': [[''], [50, 57, 47, 39, 52, 62, 71]],
 '2495((0,0))': [[''], [57, 47, 39, 52, 62, 71, 43]],
 '2496((0,0))': [['their prisoners but it was impossible to make them understand anything nothing they could say to them or give them or do for them but was looked upon as going to murder them they first of all unbound them but',
   'of planks and were painted and what further convinced me was that found in it capital gun loaded',
   'heart that he should show himself so careless for if could see this radiance might it not reach',
   'and his importunity prevailed and engaged me to go in his ship as private trader to the east indies this was in the year 1694 in this voyage visited my',
   'said he be pleased to give me leave to lay down few propositions as the foundat

In [134]:
test_samples

<map at 0x7f37e5b53880>

In [103]:
complete_samples[complete_samples["tweets"] == complete_samples["content"]] #.iloc[0]["tweets"]

Unnamed: 0_level_0,Unnamed: 1_level_0,day,xy,temp,content,tweets
xy,day,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1
"(0,0)",2490,2490,"(0,0)","[44, 41, 51, 52, 50, 57, 47]",,
"(0,0)",2491,2491,"(0,0)","[41, 51, 52, 50, 57, 47, 39]",,
"(0,0)",2492,2492,"(0,0)","[51, 52, 50, 57, 47, 39, 52]",,
"(0,0)",2493,2493,"(0,0)","[52, 50, 57, 47, 39, 52, 62]",,
"(0,0)",2494,2494,"(0,0)","[50, 57, 47, 39, 52, 62, 71]",,
...,...,...,...,...,...,...
"(9,9)",3567,3567,"(9,9)","[45, 45, 35, 20, 42, 33, 40]",,
"(9,9)",3568,3568,"(9,9)","[45, 35, 20, 42, 33, 40, 47]",,
"(9,9)",3569,3569,"(9,9)","[35, 20, 42, 33, 40, 47, 43]",,
"(9,9)",3570,3570,"(9,9)","[20, 42, 33, 40, 47, 43, 30]",,


In [149]:
from transformers import pipeline
import keras
import numpy as np
import pickle


classifier = pipeline("sentiment-analysis", model="./blaze_nlp")
lstm_model = keras.models.load_model("lstm_model.h5")
scaler = pickle.load(open("scaler.pkl", "rb"))

2023-08-09 03:08:23.571413: I tensorflow/core/platform/cpu_feature_guard.cc:182] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.
Using /home/tricked/.cache/torch_extensions/py311_cu117 as PyTorch extensions root...
Detected CUDA files, patching ldflags
Failed to load CUDA kernels. Mra requires custom CUDA kernels. Please verify that compatible versions of PyTorch and CUDA Toolkit are installed: CUDA_HOME environment variable is not set. Please set it to your CUDA install root.
Xformers is not installed correctly. If you want to use memory_efficient_attention to accelerate training use the following command to install Xformers
pip install xformers.
2023-08-09 03:08:26.751081: I tensorflow/compiler/xla/stream_executor/cuda/cuda_gpu_executor.cc:995] successful NUMA node read from SysFS had negative value (-1)

In [150]:
from typing import Union
import numpy as np


def derive_sentiment(model_output: {"label": str, "score": float}) -> float:
    """
    Derives the sentiment score from the model's output.

    :param model_output: A dictionary containing the model's classification label and score.
    :return: A sentiment score based on the model's output. Positive for 'yes_fire', negative for 'no_fire'.
    """

    return model_output["score"] * (1 if model_output["label"] == "yes_fire" else -1)


def predict_fire_from_temp(temperatures: Union[list[int], list[list[int]]]) -> list[dict]:
    """
    Predicts fire based on the given temperatures.

    :param temperatures: A list or list of lists containing temperature values.
    :return: A list of dictionaries containing the prediction labels ('no_fire' or 'yes_fire') and scores.
    """

    temperatures = np.array([temperatures]).reshape((-1, 7))
    temperatures = scaler.transform(temperatures)
    temperatures = temperatures.reshape(
        (temperatures.shape[0], temperatures.shape[1], 1)
    )

    # predict the fire
    predictions = lstm_model.predict(temperatures, verbose=0)
    output = []
    for prediction in list(predictions):
        prediction_result = np.argmax(prediction)
        label = ["no_fire", "yes_fire"][prediction_result]
        score = prediction[prediction_result]

        output.append({"label": label, "score": score})

    return output


def predict_fire(
    tweets: Union[list[str], list[list[str]]],
    temperatures: Union[list[int], list[list[int]]],
) -> list[float]:
    """
    Predicts fire based on the given tweets and temperatures.

    :param tweets: A list or list of lists containing tweets. Can be an empty list.
    :param temperatures: A list or list of lists containing temperature values.
    :return: A list of sentiment scores combining the information from tweets and temperatures.
             If tweets is an empty list, the function will return predictions based solely on temperatures.
    """

    if not isinstance(temperatures[0], list):
        temperatures = [temperatures]

    if not tweets:
        # Handle case when tweets is an empty list
        flat_temperatures = [temp for temp_batch in temperatures for temp in temp_batch]
        temperature_fire = predict_fire_from_temp(flat_temperatures)
        return [derive_sentiment(temp) for temp in temperature_fire]

    if len(tweets) and not isinstance(tweets[0], list):
        tweets = [tweets]

    # Flattening tweets and storing their batch indices
    flat_tweets = []
    tweet_batch_indices = [0]
    for tweet_batch in tweets:
        flat_tweets.extend(tweet_batch)
        tweet_batch_indices.append(len(flat_tweets))

    # Flattening temperatures and storing their batch indices
    flat_temperatures = [temp for temp_batch in temperatures for temp in temp_batch]
    temperature_batch_indices = [0]
    for i in range(len(temperatures)):
        temperature_batch_indices.append(
            temperature_batch_indices[-1] + len(temperatures[i])
        )

    # Get predictions for the flattened tweets and temperatures
    tweet_fire = classifier(flat_tweets)
    temperature_fire = predict_fire_from_temp(flat_temperatures)

    output = []

    # Process predictions based on indices
    for i in range(len(temperatures)):
        tweet_batch_start, tweet_batch_end = (
            tweet_batch_indices[i],
            tweet_batch_indices[i + 1],
        )
        tweet_batch_result = tweet_fire[tweet_batch_start:tweet_batch_end]
        temperature_batch_result = temperature_fire[i]

        average_tweet_sentiment = 0
        for tweet in tweet_batch_result:
            sentiment = derive_sentiment(tweet)
            average_tweet_sentiment += sentiment * (0.2 if sentiment < 0 else 1)
        average_tweet_sentiment = round(average_tweet_sentiment)

        temperature_sentiment = derive_sentiment(temperature_batch_result)

        if not average_tweet_sentiment:
            output.append(temperature_sentiment)
        else:
            output.append(average_tweet_sentiment * 0.4 + temperature_sentiment * 0.6)

    return output
    

In [153]:
data = result

In [157]:
# Define batch size
batch_size = 128

# Split the data into batches
keys = list(data.keys())
batched_tweets = []
batched_temperatures = []
for i in tqdm(range(0, len(keys), batch_size)):
    batch_tweets = [data[key][0] for key in keys[i:i+batch_size]]
    batch_temperatures = [data[key][1] for key in keys[i:i+batch_size]]
    batched_tweets.append(batch_tweets)
    batched_temperatures.append(batch_temperatures)

# Call predict_fire on each batch and map the outputs to the keys
results = {}
for i in tqdm(range(len(batched_tweets))):
    predictions = predict_fire(batched_tweets[i], batched_temperatures[i])
    for j, key in enumerate(keys[i*batch_size : (i+1)*batch_size]):
        results[key] = predictions[j]

100%|█████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 8/8 [00:00<00:00, 10621.85it/s]
100%|████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 8/8 [00:36<00:00,  4.51s/it]


In [158]:
results


{'2490((0,0))': -0.9999999403953552,
 '2491((0,0))': -0.9999999403953552,
 '2492((0,0))': -0.9999999403953552,
 '2493((0,0))': -0.9999999403953552,
 '2494((0,0))': -0.9999999403953552,
 '2495((0,0))': -0.9999999403953552,
 '2496((0,0))': -0.9999999642372132,
 '2497((0,0))': -0.9999999403953552,
 '2498((0,0))': -0.9999999403953552,
 '2499((0,0))': -0.9999999403953552,
 '2500((0,0))': -1.3999999642372132,
 '2501((0,0))': -0.9999999403953552,
 '2502((0,0))': -0.9999999403953552,
 '2503((0,0))': -0.9999999403953552,
 '2504((0,0))': -0.9999999403953552,
 '2505((0,0))': -0.9999999403953552,
 '2506((0,0))': -0.9999999403953552,
 '2507((0,0))': -0.9999999403953552,
 '2508((0,0))': -0.9999999403953552,
 '2509((0,0))': -0.9999999642372132,
 '2510((0,0))': -0.9999999403953552,
 '2511((0,0))': -0.9999999403953552,
 '2512((0,0))': -0.9999999403953552,
 '2513((0,0))': -0.9999999403953552,
 '2514((0,0))': -0.9999999403953552,
 '2515((0,0))': -0.9999999403953552,
 '2516((0,0))': -0.9999999403953552,
 

In [159]:
data["2582((0,0))"]

[[''], [106, 89, 100, 105, 88, 99, 117]]

In [164]:
# Define environment variables
host = os.getenv("MEMPHIS_HOSTNAME")
username = os.getenv("MEMPHIS_USERNAME")
password = os.getenv("MEMPHIS_PASSWORD")
account_id = os.getenv("MEMPHIS_ACCOUNT_ID")


async def egress(station_name, msg):
    try:
        memphis = Memphis()
        await memphis.connect(host=host, username=username, password=password, account_id=account_id)
        producer = await memphis.producer(station_name=f"{station_name}",
                                          producer_name=f"{station_name}-producer")  # you can send the message parameter as dict as well
        await producer.produce(bytearray(json.dumps(msg), "utf-8"))

    except (MemphisError, MemphisConnectError, MemphisHeaderError, MemphisSchemaError) as e:
        print(e)

    finally:
        await memphis.close()


In [179]:
for k, v in results.items():
    day, xy = parse_key(k)
    print(day, xy, v)

2490 0,0 -0.9999999403953552
2491 0,0 -0.9999999403953552
2492 0,0 -0.9999999403953552
2493 0,0 -0.9999999403953552
2494 0,0 -0.9999999403953552
2495 0,0 -0.9999999403953552
2496 0,0 -0.9999999642372132
2497 0,0 -0.9999999403953552
2498 0,0 -0.9999999403953552
2499 0,0 -0.9999999403953552
2500 0,0 -1.3999999642372132
2501 0,0 -0.9999999403953552
2502 0,0 -0.9999999403953552
2503 0,0 -0.9999999403953552
2504 0,0 -0.9999999403953552
2505 0,0 -0.9999999403953552
2506 0,0 -0.9999999403953552
2507 0,0 -0.9999999403953552
2508 0,0 -0.9999999403953552
2509 0,0 -0.9999999642372132
2510 0,0 -0.9999999403953552
2511 0,0 -0.9999999403953552
2512 0,0 -0.9999999403953552
2513 0,0 -0.9999999403953552
2514 0,0 -0.9999999403953552
2515 0,0 -0.9999999403953552
2516 0,0 -0.9999999403953552
2517 0,0 -0.9999999403953552
2518 0,0 -0.9999999403953552
2519 0,0 -0.9999999403953552
2520 0,0 -0.9999999403953552
2521 0,0 -0.9999999403953552
2522 0,0 -0.9999999403953552
2523 0,0 -0.9999999403953552
2524 0,0 -0.99

In [178]:
def parse_key(s: str):
    day_str, _, xy_str = s.split('(')
    day = int(day_str)
    xy =  xy_str.strip(')')  # Include the parentheses in the coordinate string
    return day, xy