From 39437ad9c7036e4ec306ba79ab2786e8135b9469 Mon Sep 17 00:00:00 2001 From: hsubbaraj-spiral <87333321+hsubbaraj-spiral@users.noreply.github.com> Date: Fri, 2 Jun 2023 18:32:38 +0530 Subject: [PATCH] Added notebook for running Simple Model with large data (#1368) --- .../Simple_Model_Large_Data.ipynb | 119 ++++++++++++++++++ 1 file changed, 119 insertions(+) create mode 100644 examples/system-tests/Simple_Model_Large_Data.ipynb diff --git a/examples/system-tests/Simple_Model_Large_Data.ipynb b/examples/system-tests/Simple_Model_Large_Data.ipynb new file mode 100644 index 000000000..ca0ae5d62 --- /dev/null +++ b/examples/system-tests/Simple_Model_Large_Data.ipynb @@ -0,0 +1,119 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": null, + "id": "bc7d2d08", + "metadata": {}, + "outputs": [], + "source": [ + "import aqueduct as aq\n", + "from aqueduct.constants.enums import ArtifactType" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "9460bb78", + "metadata": {}, + "outputs": [], + "source": [ + "client = aq.Client(api_key=\"\", aqueduct_address=\"\")\n", + "\n", + "\n", + "aq.global_config({'engine':'databricks_resource', 'lazy':True})" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "eb9e417e", + "metadata": {}, + "outputs": [], + "source": [ + "#This is working with large data > 50GB.\n", + "snowflake_warehouse = client.resource('snowflake_resource')\n", + "hotel_reviews = snowflake_warehouse.sql('SELECT * FROM large_hotel_reviews;')" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "fa652f83", + "metadata": {}, + "outputs": [], + "source": [ + "@aq.op(requirements=[])\n", + "def dummy(original_df, num_rows):\n", + " from pyspark.sql.functions import monotonically_increasing_id\n", + " from pyspark.sql.functions import rand\n", + " import math\n", + " \n", + " original_row_count = original_df.count()\n", + " num_partitions = int(math.ceil(num_rows / original_row_count))\n", + "\n", + " # Step 2: Repartition the DataFrame\n", + " replicated_df = original_df.repartition(num_partitions)\n", + "\n", + " # Step 3: Persist the DataFrame\n", + " replicated_df.persist()\n", + "\n", + " # Step 4: Duplicate the rows\n", + " while replicated_df.count() < num_rows:\n", + " replicated_df = replicated_df.union(replicated_df)\n", + "\n", + " print(replicated_df.count())\n", + "\n", + " return replicated_df\n", + "\n", + "generated_df = dummy(hotel_reviews, 10000000000)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "324fb630", + "metadata": {}, + "outputs": [], + "source": [ + "snowflake_warehouse.save(generated_df, table_name=\"large_hotel_reviews\", update_mode=\"replace\")\n", + "\n", + "\n", + "client.publish_flow(\n", + " \"Creating_Large_Dataset\",\n", + " \"repartition hotel_reviews to create big dataset\",\n", + " artifacts=[generated_df],\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "7c9e16e2", + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.8.8" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +}